From 1f4c2bb1046d428e67cdb089942f687ae78ad0e1 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Fri, 26 Apr 2013 08:40:46 -0700 Subject: [PATCH] Switch the launcher to Gearman. Remove the Jenkins launcher and add a new Gearman launcher (designed to be compatible with Jenkins) in its place. See the documentation for how to set up the Gearman Plugin for Jenkins. Change-Id: Ie7224396271d7375f4ea42eebb57f883bc291738 --- doc/source/index.rst | 4 +- doc/source/launchers.rst | 230 +++-- doc/source/zuul.rst | 50 +- tests/fixtures/custom_functions.py | 2 + tests/fixtures/layout.yaml | 11 + tests/fixtures/zuul.conf | 6 +- tests/test_scheduler.py | 1377 +++++++++++++++------------- tools/pip-requires | 1 + tox.ini | 4 +- zuul/cmd/server.py | 18 +- zuul/launcher/gearman.py | 387 ++++++++ zuul/launcher/jenkins.py | 499 ---------- zuul/model.py | 1 + zuul/webapp.py | 50 + 14 files changed, 1419 insertions(+), 1221 deletions(-) create mode 100644 tests/fixtures/custom_functions.py create mode 100644 zuul/launcher/gearman.py delete mode 100644 zuul/launcher/jenkins.py create mode 100644 zuul/webapp.py diff --git a/doc/source/index.rst b/doc/source/index.rst index f901fd08f2..039cffa75e 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -10,8 +10,8 @@ Zuul is a program that is used to gate the source code repository of a project so that changes are only merged if they pass tests. The main component of Zuul is the scheduler. It receives events -related to proposed changes (currently from Gerrit), triggers tests -based on those events (currently on Jenkins), and reports back. +related to proposed changes, triggers tests based on those events, and +reports back. Contents: diff --git a/doc/source/launchers.rst b/doc/source/launchers.rst index b347dc8983..a219936e4d 100644 --- a/doc/source/launchers.rst +++ b/doc/source/launchers.rst @@ -1,73 +1,74 @@ :title: Launchers -.. _launchers: +.. _Gearman: http://gearman.org/ +.. _`Gearman Plugin`: + https://wiki.jenkins-ci.org/display/JENKINS/Gearman+Plugin + +.. _launchers: Launchers ========= -Zuul has a modular architecture for launching jobs. Currently only -Jenkins is supported, but it should be fairly easy to add a module to -support other systems. Zuul makes very few assumptions about the -interface to a launcher -- if it can trigger jobs, cancel them, and -receive success or failure reports, it should be able to be used with -Zuul. Patches to this effect are welcome. +Zuul has a modular architecture for launching jobs. Currently, the +only supported module interfaces with Gearman_. This design allows +any system to run jobs for Zuul simply by interfacing with a Gearman +server. The recommended way of integrating a new job-runner with Zuul +is via this method. -Jenkins +If Gearman is unsuitable, Zuul may be extended with a new launcher +module. Zuul makes very few assumptions about the interface to a +launcher -- if it can trigger jobs, cancel them, and receive success +or failure reports, it should be able to be used with Zuul. Patches +to this effect are welcome. + +Gearman ------- -Zuul works with Jenkins using the Jenkins API and the notification -module. It uses the Jenkins API to trigger jobs, passing in -parameters indicating what should be tested. It recieves -notifications on job completion via the notification API (so jobs must -be conifigured to notify Zuul). +Gearman_ is a general-purpose protocol for distributing jobs to any +number of workers. Zuul works with Gearman by sending specific +information with job requests to Gearman, and expects certain +information to be returned on completion. This protocol is described +in `Zuul-Gearman Protocol`_. -Jenkins Configuration -~~~~~~~~~~~~~~~~~~~~~ +The `Gearman Jenkins Plugin`_ makes it easy to use Jenkins with Zuul +by providing an interface between Jenkins and Gearman. In this +configuration, Zuul asks Gearman to run jobs, and Gearman can then +distribute those jobs to any number of Jenkins systems (including +multiple Jenkins masters). -Zuul will need access to a Jenkins user. Create a user in Jenkins, -and then visit the configuration page for the user: +In order for Zuul to run any jobs, you will need a running Gearman +server. The latest version of gearmand from gearman.org is required +in order to support canceling jobs while in the queue. The server is +easy to set up -- just make sure that it allows connections from Zuul +and any workers (e.g., Jenkins masters) on port 4730, and nowhere else +(as the Gearman protocol does not include any provision for +authentication. - https://jenkins.example.com/user/USERNAME/configure +Gearman Jenkins Plugin +---------------------- -And click **Show API Token** to retrieve the API token for that user. -You will need this later when configuring Zuul. Appropriate user -permissions must be set under the Jenkins security matrix: under the -``Global`` group of permissions, check ``Read``, then under the ``Job`` -group of permissions, check ``Read`` and ``Build``. Finally, under -``Run`` check ``Update``. If using a per project matrix, make sure the -user permissions are properly set for any jobs that you want Zuul to -trigger. +The `Gearman Plugin`_ can be installed in Jenkins in order to +facilitate Jenkins running jobs for Zuul. Install the plugin and +configure it with the hostname or IP address of your Gearman server +and the port on which it is listening (4730 by default). It will +automatically register all known Jenkins jobs as functions that Zuul +can invoke via Gearman. -Make sure the notification plugin is installed. Visit the plugin -manager on your jenkins: +Any number of masters can be configured in this way, and Gearman will +distribute jobs to all of them as appropriate. - https://jenkins.example.com/pluginManager/ +No special Jenkins job configuration is needed to support triggering +by Zuul. -And install **Jenkins Notification plugin**. The homepage for the -plugin is at: +Zuul Parameters +--------------- - https://wiki.jenkins-ci.org/display/JENKINS/Notification+Plugin - -Jenkins Job Configuration -~~~~~~~~~~~~~~~~~~~~~~~~~ - -For each job that you want Zuul to trigger, you will need to add a -notification endpoint for the job on that job's configuration page. -Click **Add Endpoint** and enter the following values: - -**Protocol** - ``HTTP`` -**URL** - ``http://127.0.0.1:8001/jenkins_endpoint`` - -If you are running Zuul on a different server than Jenkins, enter the -appropriate URL. Note that Zuul itself has no access controls, so -ensure that only Jenkins is permitted to access that URL. - -Zuul will pass some parameters to Jenkins for every job it launches. -Check **This build is parameterized**, and add the following fields -with the type **String Parameter**: +Zuul will pass some parameters with every job it launches. The +Gearman Plugin will ensure these are supplied as Jenkins build +parameters, so they will be available for use in the job configuration +as well as to the running job as environment variables. They are as +follows: **ZUUL_UUID** Zuul provided key to link builds with Gerrit events @@ -75,27 +76,14 @@ with the type **String Parameter**: Zuul provided ref that includes commit(s) to build **ZUUL_COMMIT** The commit SHA1 at the head of ZUUL_REF - -Those are the only required parameters. The ZUUL_UUID is needed for Zuul to -keep track of the build, and the ZUUL_REF and ZUUL_COMMIT parameters are for -use in preparing the git repo for the build. - -.. note:: - The GERRIT_PROJECT and UUID parameters are deprecated respectively in - favor of ZUUL_PROJECT and ZUUL_UUID. - -The following parameters will be sent for all builds, but are not required so -you do not need to configure Jenkins to accept them if you do not plan on using -them: - **ZUUL_PROJECT** The project that triggered this build **ZUUL_PIPELINE** The Zuul pipeline that is building this job -The following parameters are optional and will only be provided for -builds associated with changes (i.e., in response to patchset-created -or comment-added events): +The following additional parameters will only be provided for builds +associated with changes (i.e., in response to patchset-created or +comment-added events): **ZUUL_BRANCH** The target branch for the change that triggered this build @@ -107,7 +95,7 @@ or comment-added events): **ZUUL_PATCHSET** The Gerrit patchset number for the change that triggered this build -The following parameters are optional and will only be provided for +The following additional parameters will only be provided for post-merge (ref-updated) builds: **ZUUL_OLDREV** @@ -139,7 +127,107 @@ That should be sufficient for a job that only builds a single project. If you have multiple interrelated projects (i.e., they share a Zuul Change Queue) that are built together, you may be able to configure the Git plugin to prepare them, or you may chose to use a shell script -instead. The OpenStack project uses the following script to prepare -the workspace for its integration testing: +instead. As an example, the OpenStack project uses the following +script to prepare the workspace for its integration testing: https://github.com/openstack-infra/devstack-gate/blob/master/devstack-vm-gate-wrap.sh + + +Zuul-Gearman Protocol +--------------------- + +This section is only relevant if you intend to implement a new kind of +worker that runs jobs for Zuul via Gearman. If you just want to use +Jenkins, see `Gearman Jenkins Plugin`_ instead. + +The Zuul protocol as used with Gearman is as follows: + +Starting Builds +~~~~~~~~~~~~~~~ + +To start a build, Zuul invokes a Gearman function with the following +format: + + build:FUNCTION_NAME + +where **FUNCTION_NAME** is the name of the job that should be run. If +the job should run on a specific node (or class of node), Zuul will +instead invoke: + + build:FUNCTION_NAME:NODE_NAME + +where **NODE_NAME** is the name or class of node on which the job +should be run. This can be specified by setting the ZUUL_NODE +parameter in a paremeter-function (see :ref:`zuulconf`). + +Zuul sends the ZUUL_* parameters described in `Zuul Parameters`_ +encoded in JSON format as the argument included with the +SUBMIT_JOB_UNIQ request to Gearman. A unique ID (equal to the +ZUUL_UUID parameter) is also supplied to Gearman, and is accessible as +an added Gearman parameter with GRAB_JOB_UNIQ. + +When a Gearman worker starts running a job for Zuul, it should +immediately send a WORK_DATA packet with the following information +encoded in JSON format: + +**full_url** + The URL with the status or results of the build. Will be used in + the status page and the final report. + +**number** + The build number (unique to this job). + +**master** + A unique identifier associated with the Gearman worker that can + abort this build. See `Stopping Builds`_ for more information. + +It should then immediately send a WORK_STATUS packet with a value of 0 +percent complete. It may then optionally send subsequent WORK_STATUS +packets with updated completion values. + +When the build is complete, it should send a final WORK_DATA packet +with the following in JSON format: + +**result** + Either the string 'SUCCESS' if the job succeeded, or any other value + that describes the result if the job failed. + +Finally, it should send either a WORK_FAIL or WORK_COMPLETE packet as +appropriate. A WORK_EXCEPTION packet will be interpreted as a +WORK_FAIL, but the exception will be logged in Zuul's error log. + +Stopping Builds +~~~~~~~~~~~~~~~ + +If Zuul needs to abort a build already in progress, it will invoke the +following function through Gearman: + + stop:MASTER_NAME + +Where **MASTER_NAME** is the name of the master node supplied in the +initial WORK_DATA packet when the job started. This is used to direct +the stop: function invocation to the correct Gearman worker that is +capable of stopping that particular job. The argument to the function +will be the unique ID of the job that should be stopped. + +The original job is expected to complete with a WORK_DATA and +WORK_FAIL packet as described in `Starting Builds`_. + +Build Descriptions +~~~~~~~~~~~~~~~~~~ + +In order to update the job running system with a description of the +current state of all related builds, the job runner may optionally +implement the following Gearman function: + + set_description:MASTER_NAME + +Where **MASTER_NAME** is used as described in `Stopping Builds`_. The +argument to the function is the following encoded in JSON format: + +**unique_id** + The unique identifier of the build whose description should be + updated. + +**html_description** + The description of the build in HTML format. diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst index 96faf86c5b..4b61507222 100644 --- a/doc/source/zuul.rst +++ b/doc/source/zuul.rst @@ -9,7 +9,8 @@ Configuration Zuul has three configuration files: **zuul.conf** - Credentials for Gerrit and Jenkins, locations of the other config files + Connection information for Gerrit and Gearman, locations of the + other config files **layout.yaml** Project and pipeline configuration -- what Zuul does **logging.conf** @@ -27,30 +28,26 @@ Zuul will look for ``/etc/zuul/zuul.conf`` or ``~/zuul.conf`` to bootstrap its configuration. Alternately, you may specify ``-c /path/to/zuul.conf`` on the command line. -Gerrit and Jenkins credentials are each described in a section of -zuul.conf. The location of the other two configuration files (as well -as the location of the PID file when running Zuul as a server) are -specified in a third section. +Gerrit and Gearman connection information are each described in a +section of zuul.conf. The location of the other two configuration +files (as well as the location of the PID file when running Zuul as a +server) are specified in a third section. The three sections of this config and their options are documented below. You can also find an example zuul.conf file in the git `repository `_ -jenkins +gearman """"""" **server** - URL for the root of the Jenkins HTTP server. - ``server=https://jenkins.example.com`` + Hostname or IP address of the Gearman server. + ``server=gearman.example.com`` -**user** - User to authenticate against Jenkins with. - ``user=jenkins`` - -**apikey** - Jenkins API Key credentials for the above user. - ``apikey=1234567890abcdef1234567890abcdef`` +**port** + Port on which the Gearman server is listening + ``port=4730`` gerrit """""" @@ -65,11 +62,11 @@ gerrit **user** User name to use when logging into above server via ssh. - ``user=jenkins`` + ``user=zuul`` **sshkey** Path to SSH key to use when logging into above server. - ``sshkey=/home/jenkins/.ssh/id_rsa`` + ``sshkey=/home/zuul/.ssh/id_rsa`` zuul """" @@ -107,13 +104,14 @@ zuul **status_url** URL that will be posted in Zuul comments made to Gerrit changes when - beginning Jenkins jobs for a change. - ``status_url=https://jenkins.example.com/zuul/status`` + starting jobs for a change. + ``status_url=https://zuul.example.com/status`` **url_pattern** - If you are storing build logs external to Jenkins and wish to link to - those logs when Zuul makes comments on Gerrit changes for completed - jobs this setting configures what the URLs for those links should be. + If you are storing build logs external to the system that originally + ran jobs and wish to link to those logs when Zuul makes comments on + Gerrit changes for completed jobs this setting configures what the + URLs for those links should be. ``http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}`` layout.yaml @@ -410,13 +408,13 @@ each job as it builds a list from the project specification. **failure-pattern (optional)** The URL that should be reported to Gerrit if the job fails. - Defaults to the Jenkins build URL or the url_pattern configured in + Defaults to the build URL or the url_pattern configured in zuul.conf. May be supplied as a string pattern with substitutions as described in url_pattern in :ref:`zuulconf`. **success-pattern (optional)** The URL that should be reported to Gerrit if the job succeeds. - Defaults to the Jenkins build URL or the url_pattern configured in + Defaults to the build URL or the url_pattern configured in zuul.conf. May be supplied as a string pattern with substitutions as described in url_pattern in :ref:`zuulconf`. @@ -461,6 +459,10 @@ each job as it builds a list from the project specification. :param parameters: parameters to be passed to the job :type parameters: dict + If the parameter **ZUUL_NODE** is set by this function, then it will + be used to specify on what node (or class of node) the job should be + run. + Here is an example of setting the failure message for jobs that check whether a change merges cleanly:: diff --git a/tests/fixtures/custom_functions.py b/tests/fixtures/custom_functions.py new file mode 100644 index 0000000000..e7967225c5 --- /dev/null +++ b/tests/fixtures/custom_functions.py @@ -0,0 +1,2 @@ +def select_debian_node(change, params): + params['ZUUL_NODE'] = 'debian' diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml index ecdd2da59f..8d6c852256 100644 --- a/tests/fixtures/layout.yaml +++ b/tests/fixtures/layout.yaml @@ -1,3 +1,6 @@ +includes: + - python-file: custom_functions.py + pipelines: - name: check manager: IndependentPipelineManager @@ -46,6 +49,8 @@ jobs: - name: project-testfile files: - '.*-requires' + - name: node-project-test1 + parameter-function: select_debian_node project-templates: - name: test-one-and-two @@ -135,3 +140,9 @@ projects: template: - name: test-one-and-two projectname: project + + - name: org/node-project + gate: + - node-project-merge: + - node-project-test1 + - node-project-test2 diff --git a/tests/fixtures/zuul.conf b/tests/fixtures/zuul.conf index c031ae3e13..8948ae8d70 100644 --- a/tests/fixtures/zuul.conf +++ b/tests/fixtures/zuul.conf @@ -1,7 +1,5 @@ -[jenkins] -server=https://jenkins.example.com -user=jenkins -apikey=1234 +[gearman] +server=127.0.0.1 [gerrit] server=review.example.com diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 508af42498..4d3351e311 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -33,10 +33,12 @@ import statsd import shutil import socket import string +from cStringIO import StringIO import git +import gear import zuul.scheduler -import zuul.launcher.jenkins +import zuul.launcher.gearman import zuul.trigger.gerrit FIXTURE_DIR = os.path.join(os.path.dirname(__file__), @@ -54,7 +56,9 @@ GIT_ROOT = os.path.join(TEST_ROOT, "git") CONFIG.set('zuul', 'git_dir', GIT_ROOT) -logging.basicConfig(level=logging.DEBUG) +logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s %(name)-32s ' + '%(levelname)-8s %(message)s') def random_sha1(): @@ -138,11 +142,15 @@ def ref_has_change(ref, change): def job_has_changes(*args): job = args[0] commits = args[1:] - project = job.parameters['ZUUL_PROJECT'] + if isinstance(job, FakeBuild): + parameters = job.parameters + else: + parameters = json.loads(job.arguments) + project = parameters['ZUUL_PROJECT'] path = os.path.join(GIT_ROOT, project) repo = git.Repo(path) - ref = job.parameters['ZUUL_REF'] - sha = job.parameters['ZUUL_COMMIT'] + ref = parameters['ZUUL_REF'] + sha = parameters['ZUUL_COMMIT'] repo_messages = [c.message.strip() for c in repo.iter_commits(ref)] repo_shas = [c.hexsha for c in repo.iter_commits(ref)] commit_messages = ['%s-1' % commit.subject for commit in commits] @@ -381,240 +389,13 @@ class FakeGerrit(object): pass -class FakeJenkinsEvent(object): - def __init__(self, name, number, parameters, phase, status=None): - data = { - 'build': { - 'full_url': 'https://server/job/%s/%s/' % (name, number), - 'number': number, - 'parameters': parameters, - 'phase': phase, - 'url': 'job/%s/%s/' % (name, number), - }, - 'name': name, - 'url': 'job/%s/' % name, - } - if status: - data['build']['status'] = status - self.body = json.dumps(data) +class BuildHistory(object): + def __init__(self, **kw): + self.__dict__.update(kw) - -class FakeJenkinsJob(threading.Thread): - log = logging.getLogger("zuul.test") - - def __init__(self, jenkins, callback, name, number, parameters): - threading.Thread.__init__(self) - self.jenkins = jenkins - self.callback = callback - self.name = name - self.number = number - self.parameters = parameters - self.wait_condition = threading.Condition() - self.waiting = False - self.aborted = False - self.canceled = False - self.created = time.time() - - def release(self): - self.wait_condition.acquire() - self.wait_condition.notify() - self.waiting = False - self.log.debug("Job %s released" % (self.parameters['UUID'])) - self.wait_condition.release() - - def isWaiting(self): - self.wait_condition.acquire() - if self.waiting: - ret = True - else: - ret = False - self.wait_condition.release() - return ret - - def _wait(self): - self.wait_condition.acquire() - self.waiting = True - self.log.debug("Job %s waiting" % (self.parameters['UUID'])) - self.wait_condition.wait() - self.wait_condition.release() - - def run(self): - self.jenkins.fakeEnqueue(self) - if self.jenkins.hold_jobs_in_queue: - self._wait() - self.jenkins.fakeDequeue(self) - if self.canceled: - self.jenkins.all_jobs.remove(self) - return - self.callback.jenkins_endpoint(FakeJenkinsEvent(self.name, - self.number, - self.parameters, - 'STARTED')) - if self.jenkins.hold_jobs_in_build: - self._wait() - self.log.debug("Job %s continuing" % (self.parameters['UUID'])) - - result = 'SUCCESS' - if (('ZUUL_REF' in self.parameters) and - self.jenkins.fakeShouldFailTest(self.name, - self.parameters['ZUUL_REF'])): - result = 'FAILURE' - if self.aborted: - result = 'ABORTED' - - changes = None - if 'ZUUL_CHANGE_IDS' in self.parameters: - changes = self.parameters['ZUUL_CHANGE_IDS'] - - self.jenkins.fakeAddHistory(name=self.name, number=self.number, - result=result, changes=changes) - self.jenkins.lock.acquire() - self.callback.jenkins_endpoint(FakeJenkinsEvent(self.name, - self.number, - self.parameters, - 'COMPLETED', - result)) - self.callback.jenkins_endpoint(FakeJenkinsEvent(self.name, - self.number, - self.parameters, - 'FINISHED', - result)) - self.jenkins.all_jobs.remove(self) - self.jenkins.lock.release() - - -class FakeJenkins(object): - log = logging.getLogger("zuul.test") - - def __init__(self, *args, **kw): - self.queue = [] - self.all_jobs = [] - self.job_counter = {} - self.queue_counter = 0 - self.job_history = [] - self.hold_jobs_in_queue = False - self.hold_jobs_in_build = False - self.fail_tests = {} - self.nonexistent_jobs = [] - self.lock = threading.Lock() - - def fakeEnqueue(self, job): - self.queue.append(job) - - def fakeDequeue(self, job): - self.queue.remove(job) - - class FakeJobHistory(object): - def __init__(self, **kw): - self.__dict__.update(kw) - - def __repr__(self): - return ("" % - (self.result, self.name, self.number, self.changes)) - - def fakeAddHistory(self, **kw): - self.job_history.append(self.FakeJobHistory(**kw)) - - def fakeRelease(self, regex=None): - all_jobs = self.all_jobs[:] - self.log.debug("releasing jobs %s (%s)" % (regex, len(self.all_jobs))) - for job in all_jobs: - if not regex or re.match(regex, job.name): - self.log.debug("releasing job %s" % (job.parameters['UUID'])) - job.release() - else: - self.log.debug("not releasing job %s" % - (job.parameters['UUID'])) - self.log.debug("done releasing jobs %s (%s)" % (regex, - len(self.all_jobs))) - - def fakeAllWaiting(self, regex=None): - all_jobs = self.all_jobs[:] + self.queue[:] - for job in all_jobs: - self.log.debug("job %s %s" % (job.parameters['UUID'], - job.isWaiting())) - if not job.isWaiting(): - return False - return True - - def fakeAddFailTest(self, name, change): - l = self.fail_tests.get(name, []) - l.append(change) - self.fail_tests[name] = l - - def fakeShouldFailTest(self, name, ref): - l = self.fail_tests.get(name, []) - for change in l: - if ref_has_change(ref, change): - return True - return False - - def build_job(self, name, parameters): - if name in self.nonexistent_jobs: - raise Exception("Job does not exist") - count = self.job_counter.get(name, 0) - count += 1 - self.job_counter[name] = count - - queue_count = self.queue_counter - self.queue_counter += 1 - job = FakeJenkinsJob(self, self.callback, name, count, parameters) - job.queue_id = queue_count - - self.all_jobs.append(job) - job.start() - - def stop_build(self, name, number): - for job in self.all_jobs: - if job.name == name and job.number == number: - job.aborted = True - job.release() - return - - def cancel_queue(self, id): - for job in self.queue: - if job.queue_id == id: - job.canceled = True - job.release() - return - - def get_queue_info(self): - items = [] - for job in self.queue[:]: - self.log.debug("Queue info: %s %s" % (job.name, - job.parameters['UUID'])) - paramstr = '' - paramlst = [] - d = {'actions': [{'parameters': paramlst}, - {'causes': [{'shortDescription': - 'Started by user Jenkins', - 'userId': 'jenkins', - 'userName': 'Jenkins'}]}], - 'blocked': False, - 'buildable': True, - 'buildableStartMilliseconds': (job.created * 1000) + 5, - 'id': job.queue_id, - 'inQueueSince': (job.created * 1000), - 'params': paramstr, - 'stuck': False, - 'task': {'color': 'blue', - 'name': job.name, - 'url': 'https://server/job/%s/' % job.name}, - 'why': 'Waiting for next available executor'} - for k, v in job.parameters.items(): - paramstr += "\n(StringParameterValue) %s='%s'" % (k, v) - pd = {'name': k, 'value': v} - paramlst.append(pd) - items.append(d) - return items - - def set_build_description(self, *args, **kw): - pass - - -class FakeJenkinsCallback(zuul.launcher.jenkins.JenkinsCallback): - def start(self): - pass + def __repr__(self): + return ("" % + (self.result, self.name, self.number, self.changes)) class FakeURLOpener(object): @@ -655,21 +436,253 @@ class FakeStatsd(threading.Thread): def run(self): while True: - read_ready = select.select([self.sock, self.wake_read], [], [])[0] - for sock in read_ready: - if sock is self.sock: + poll = select.poll() + poll.register(self.sock, select.POLLIN) + poll.register(self.wake_read, select.POLLIN) + ret = poll.poll() + for (fd, event) in ret: + if fd == self.sock.fileno(): data = self.sock.recvfrom(1024) if not data: return self.stats.append(data[0]) - else: - # wake_read + if fd == self.wake_read: return def stop(self): os.write(self.wake_write, '1\n') +class FakeBuild(threading.Thread): + log = logging.getLogger("zuul.test") + + def __init__(self, worker, job, number, node): + threading.Thread.__init__(self) + self.worker = worker + self.job = job + self.name = job.name.split(':')[1] + self.number = number + self.node = node + self.parameters = json.loads(job.arguments) + self.unique = self.parameters['ZUUL_UUID'] + self.wait_condition = threading.Condition() + self.waiting = False + self.aborted = False + self.created = time.time() + self.description = '' + + def release(self): + self.wait_condition.acquire() + self.wait_condition.notify() + self.waiting = False + self.log.debug("Build %s released" % self.unique) + self.wait_condition.release() + + def isWaiting(self): + self.wait_condition.acquire() + if self.waiting: + ret = True + else: + ret = False + self.wait_condition.release() + return ret + + def _wait(self): + self.wait_condition.acquire() + self.waiting = True + self.log.debug("Build %s waiting" % self.unique) + self.wait_condition.wait() + self.wait_condition.release() + + def run(self): + data = { + 'full_url': 'https://server/job/%s/%s/' % (self.name, self.number), + 'number': self.number, + 'master': self.worker.worker_id, + } + + self.job.sendWorkData(json.dumps(data)) + self.job.sendWorkStatus(0, 100) + + if self.worker.hold_jobs_in_build: + self._wait() + self.log.debug("Build %s continuing" % self.unique) + + self.worker.lock.acquire() + + result = 'SUCCESS' + if (('ZUUL_REF' in self.parameters) and + self.worker.shouldFailTest(self.name, + self.parameters['ZUUL_REF'])): + result = 'FAILURE' + if self.aborted: + result = 'ABORTED' + + data = {'result': result} + changes = None + if 'ZUUL_CHANGE_IDS' in self.parameters: + changes = self.parameters['ZUUL_CHANGE_IDS'] + + self.worker.build_history.append( + BuildHistory(name=self.name, number=self.number, + result=result, changes=changes, node=self.node, + uuid=self.unique, description=self.description) + ) + + self.job.sendWorkComplete(json.dumps(data)) + del self.worker.gearman_jobs[self.job.unique] + self.worker.running_builds.remove(self) + self.worker.lock.release() + + +class FakeWorker(gear.Worker): + def __init__(self, worker_id): + super(FakeWorker, self).__init__(worker_id) + self.gearman_jobs = {} + self.build_history = [] + self.running_builds = [] + self.build_counter = 0 + self.fail_tests = {} + + self.hold_jobs_in_build = False + self.lock = threading.Lock() + self.__work_thread = threading.Thread(target=self.work) + self.__work_thread.start() + + def handleJob(self, job): + parts = job.name.split(":") + cmd = parts[0] + name = parts[1] + if len(parts) > 2: + node = parts[2] + else: + node = None + if cmd == 'build': + self.handleBuild(job, name, node) + elif cmd == 'stop': + self.handleStop(job, name) + elif cmd == 'set_description': + self.handleSetDescription(job, name) + + def handleBuild(self, job, name, node): + build = FakeBuild(self, job, self.build_counter, node) + job.build = build + self.gearman_jobs[job.unique] = job + self.build_counter += 1 + + self.running_builds.append(build) + build.start() + + def handleStop(self, job, name): + self.log.debug("handle stop") + unique = job.arguments + for build in self.running_builds: + if build.unique == unique: + build.aborted = True + build.release() + job.sendWorkComplete() + return + job.sendWorkFail() + + def handleSetDescription(self, job, name): + self.log.debug("handle set description") + parameters = json.loads(job.arguments) + unique = parameters['unique_id'] + descr = parameters['html_description'] + for build in self.running_builds: + if build.unique == unique: + build.description = descr + job.sendWorkComplete() + return + for build in self.build_history: + if build.uuid == unique: + build.description = descr + job.sendWorkComplete() + return + job.sendWorkFail() + + def work(self): + while self.running: + try: + job = self.getJob() + except gear.InterruptedError: + continue + try: + self.handleJob(job) + except: + self.log.exception("Worker exception:") + + def addFailTest(self, name, change): + l = self.fail_tests.get(name, []) + l.append(change) + self.fail_tests[name] = l + + def shouldFailTest(self, name, ref): + l = self.fail_tests.get(name, []) + for change in l: + if ref_has_change(ref, change): + return True + return False + + def release(self, regex=None): + builds = self.running_builds[:] + self.log.debug("releasing build %s (%s)" % (regex, + len(self.running_builds))) + for build in builds: + if not regex or re.match(regex, build.name): + self.log.debug("releasing build %s" % + (build.parameters['ZUUL_UUID'])) + build.release() + else: + self.log.debug("not releasing build %s" % + (build.parameters['ZUUL_UUID'])) + self.log.debug("done releasing builds %s (%s)" % + (regex, len(self.running_builds))) + + +class FakeGearmanServer(gear.Server): + def __init__(self): + self.hold_jobs_in_queue = False + super(FakeGearmanServer, self).__init__(0) + + def getJobForConnection(self, connection, peek=False): + for job in self.queue: + if not hasattr(job, 'waiting'): + if job.name.startswith('build:'): + job.waiting = self.hold_jobs_in_queue + else: + job.waiting = False + if job.waiting: + continue + if job.name in connection.functions: + if not peek: + self.queue.remove(job) + return job + return None + + def release(self, regex=None): + released = False + queue = self.queue[:] + self.log.debug("releasing queued job %s (%s)" % (regex, + len(self.queue))) + for job in queue: + cmd, name = job.name.split(':') + if cmd != 'build': + continue + if not regex or re.match(regex, name): + self.log.debug("releasing queued job %s" % + job.unique) + job.waiting = False + released = True + else: + self.log.debug("not releasing queued job %s" % + job.unique) + if released: + self.wakeConnections() + self.log.debug("done releasing queued jobs %s (%s)" % + (regex, len(self.queue))) + + class testScheduler(unittest.TestCase): log = logging.getLogger("zuul.test") @@ -688,7 +701,7 @@ class testScheduler(unittest.TestCase): init_repo("org/one-job-project") init_repo("org/nonvoting-project") init_repo("org/templated-project") - self.config = CONFIG + init_repo("org/node-project") self.statsd = FakeStatsd() os.environ['STATSD_HOST'] = 'localhost' @@ -698,25 +711,27 @@ class testScheduler(unittest.TestCase): reload(statsd) reload(zuul.scheduler) + self.gearman_server = FakeGearmanServer() + + self.config = ConfigParser.ConfigParser() + cfg = StringIO() + CONFIG.write(cfg) + cfg.seek(0) + self.config.readfp(cfg) + self.config.set('gearman', 'port', str(self.gearman_server.port)) + + self.worker = FakeWorker('fake_worker') + self.worker.addServer('127.0.0.1', self.gearman_server.port) + self.gearman_server.worker = self.worker + self.sched = zuul.scheduler.Scheduler() - def jenkinsFactory(*args, **kw): - self.fake_jenkins = FakeJenkins() - return self.fake_jenkins - - def jenkinsCallbackFactory(*args, **kw): - self.fake_jenkins_callback = FakeJenkinsCallback(*args, **kw) - return self.fake_jenkins_callback - def URLOpenerFactory(*args, **kw): args = [self.fake_gerrit] + list(args) return FakeURLOpener(*args, **kw) - zuul.launcher.jenkins.ExtendedJenkins = jenkinsFactory - zuul.launcher.jenkins.JenkinsCallback = jenkinsCallbackFactory urllib2.urlopen = URLOpenerFactory - self.jenkins = zuul.launcher.jenkins.Jenkins(self.config, self.sched) - self.fake_jenkins.callback = self.fake_jenkins_callback + self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched) zuul.lib.gerrit.Gerrit = FakeGerrit @@ -725,22 +740,131 @@ class testScheduler(unittest.TestCase): self.gerrit.replication_retry_interval = 0.5 self.fake_gerrit = self.gerrit.gerrit - self.sched.setLauncher(self.jenkins) + self.sched.setLauncher(self.launcher) self.sched.setTrigger(self.gerrit) self.sched.start() self.sched.reconfigure(self.config) self.sched.resume() + self.launcher.gearman.waitForServer() + self.registerJobs() def tearDown(self): - self.jenkins.stop() + self.launcher.stop() + self.worker.shutdown() + self.gearman_server.shutdown() self.gerrit.stop() self.sched.stop() self.sched.join() self.statsd.stop() self.statsd.join() + threads = threading.enumerate() + if len(threads) > 1: + self.log.error("More than one thread is running: %s" % threads) #shutil.rmtree(TEST_ROOT) + def registerJobs(self): + count = 0 + for job in self.sched.jobs.keys(): + self.worker.registerFunction('build:' + job) + count += 1 + self.worker.registerFunction('stop:' + self.worker.worker_id) + count += 1 + + while len(self.gearman_server.functions) < count: + time.sleep(0) + + def release(self, job): + if isinstance(job, FakeBuild): + job.release() + else: + job.waiting = False + self.log.debug("Queued job %s released" % job.unique) + self.gearman_server.wakeConnections() + + def getParameter(self, job, name): + if isinstance(job, FakeBuild): + return job.parameters[name] + else: + parameters = json.loads(job.arguments) + return parameters[name] + + def resetGearmanServer(self): + self.worker.setFunctions([]) + while True: + done = True + for connection in self.gearman_server.active_connections: + if connection.functions: + done = False + if done: + break + time.sleep(0) + self.gearman_server.functions = set() + + def haveAllBuildsReported(self): + # See if Zuul is waiting on a meta job to complete + if self.launcher.meta_jobs: + return False + # Find out if every build that the worker has completed has been + # reported back to Zuul. If it hasn't then that means a Gearman + # event is still in transit and the system is not stable. + for build in self.worker.build_history: + zbuild = self.launcher.builds.get(build.uuid) + if not zbuild: + # It has already been reported + continue + # It hasn't been reported yet. + return False + # Make sure that none of the worker connections are in GRAB_WAIT + for connection in self.worker.active_connections: + if connection.state == 'GRAB_WAIT': + return False + return True + + def areAllBuildsWaiting(self): + ret = True + + builds = self.launcher.builds.values() + for build in builds: + client_job = None + for conn in self.launcher.gearman.active_connections: + for j in conn.related_jobs.values(): + if j.unique == build.uuid: + client_job = j + break + if not client_job: + self.log.debug("%s is not known to the gearman client" % + build) + ret = False + continue + if not client_job.handle: + self.log.debug("%s has no handle" % client_job) + ret = False + continue + server_job = self.gearman_server.jobs.get(client_job.handle) + if not server_job: + self.log.debug("%s is not known to the gearman server" % + client_job) + ret = False + continue + if not hasattr(server_job, 'waiting'): + self.log.debug("%s is being enqueued" % server_job) + ret = False + continue + if server_job.waiting: + continue + worker_job = self.worker.gearman_jobs.get(server_job.unique) + if worker_job: + if worker_job.build.isWaiting(): + continue + else: + self.log.debug("%s is running" % worker_job) + ret = False + else: + self.log.debug("%s is unassigned" % server_job) + ret = False + return ret + def waitUntilSettled(self): self.log.debug("Waiting until settled...") start = time.time() @@ -750,23 +874,25 @@ class testScheduler(unittest.TestCase): print self.sched.trigger_event_queue.empty(), print self.sched.result_event_queue.empty(), print self.fake_gerrit.event_queue.empty(), + print self.areAllBuildsWaiting() raise Exception("Timeout waiting for Zuul to settle") - # Make sure our fake jenkins doesn't end any jobs - # (and therefore, emit events) while we're checking - self.fake_jenkins.lock.acquire() - # Join ensures that the queue is empty _and_ events have been - # processed - self.fake_gerrit.event_queue.join() - self.sched.trigger_event_queue.join() - self.sched.result_event_queue.join() - if (self.sched.trigger_event_queue.empty() and - self.sched.result_event_queue.empty() and - self.fake_gerrit.event_queue.empty() and - self.fake_jenkins.fakeAllWaiting()): - self.fake_jenkins.lock.release() - self.log.debug("...settled.") - return - self.fake_jenkins.lock.release() + # Make sure no new events show up while we're checking + self.worker.lock.acquire() + # have all build states propogated to zuul? + if self.haveAllBuildsReported(): + # Join ensures that the queue is empty _and_ events have been + # processed + self.fake_gerrit.event_queue.join() + self.sched.trigger_event_queue.join() + self.sched.result_event_queue.join() + if (self.sched.trigger_event_queue.empty() and + self.sched.result_event_queue.empty() and + self.fake_gerrit.event_queue.empty() and + self.areAllBuildsWaiting()): + self.worker.lock.release() + self.log.debug("...settled.") + return + self.worker.lock.release() self.sched.wake_event.wait(0.1) def countJobResults(self, jobs, result): @@ -802,18 +928,20 @@ class testScheduler(unittest.TestCase): def test_jobs_launched(self): "Test that jobs are launched and a change is merged" + builds = self.worker.running_builds + history = self.worker.build_history + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') A.addApproval('CRVW', 2) self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) self.waitUntilSettled() - jobs = self.fake_jenkins.job_history - job_names = [x.name for x in jobs] + job_names = [x.name for x in history] assert 'project-merge' in job_names assert 'project-test1' in job_names assert 'project-test2' in job_names - assert jobs[0].result == 'SUCCESS' - assert jobs[1].result == 'SUCCESS' - assert jobs[2].result == 'SUCCESS' + assert history[0].result == 'SUCCESS' + assert history[1].result == 'SUCCESS' + assert history[2].result == 'SUCCESS' assert A.data['status'] == 'MERGED' assert A.reported == 2 self.assertEmptyQueues() @@ -830,7 +958,10 @@ class testScheduler(unittest.TestCase): def test_parallel_changes(self): "Test that changes are tested in parallel and merged in series" - self.fake_jenkins.hold_jobs_in_build = True + builds = self.worker.running_builds + history = self.worker.build_history + + self.worker.hold_jobs_in_build = True A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') @@ -843,62 +974,60 @@ class testScheduler(unittest.TestCase): self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - assert len(jobs) == 1 - assert jobs[0].name == 'project-merge' - assert job_has_changes(jobs[0], A) + assert len(builds) == 1 + assert builds[0].name == 'project-merge' + assert job_has_changes(builds[0], A) - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - assert len(jobs) == 3 - assert jobs[0].name == 'project-test1' - assert job_has_changes(jobs[0], A) - assert jobs[1].name == 'project-test2' - assert job_has_changes(jobs[1], A) - assert jobs[2].name == 'project-merge' - assert job_has_changes(jobs[2], A, B) + assert len(builds) == 3 + assert builds[0].name == 'project-test1' + assert job_has_changes(builds[0], A) + assert builds[1].name == 'project-test2' + assert job_has_changes(builds[1], A) + assert builds[2].name == 'project-merge' + assert job_has_changes(builds[2], A, B) - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - assert len(jobs) == 5 - assert jobs[0].name == 'project-test1' - assert job_has_changes(jobs[0], A) - assert jobs[1].name == 'project-test2' - assert job_has_changes(jobs[1], A) + assert len(builds) == 5 + assert builds[0].name == 'project-test1' + assert job_has_changes(builds[0], A) + assert builds[1].name == 'project-test2' + assert job_has_changes(builds[1], A) - assert jobs[2].name == 'project-test1' - assert job_has_changes(jobs[2], A, B) - assert jobs[3].name == 'project-test2' - assert job_has_changes(jobs[3], A, B) + assert builds[2].name == 'project-test1' + assert job_has_changes(builds[2], A, B) + assert builds[3].name == 'project-test2' + assert job_has_changes(builds[3], A, B) - assert jobs[4].name == 'project-merge' - assert job_has_changes(jobs[4], A, B, C) + assert builds[4].name == 'project-merge' + assert job_has_changes(builds[4], A, B, C) - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - assert len(jobs) == 6 - assert jobs[0].name == 'project-test1' - assert job_has_changes(jobs[0], A) - assert jobs[1].name == 'project-test2' - assert job_has_changes(jobs[1], A) + assert len(builds) == 6 + assert builds[0].name == 'project-test1' + assert job_has_changes(builds[0], A) + assert builds[1].name == 'project-test2' + assert job_has_changes(builds[1], A) - assert jobs[2].name == 'project-test1' - assert job_has_changes(jobs[2], A, B) - assert jobs[3].name == 'project-test2' - assert job_has_changes(jobs[3], A, B) + assert builds[2].name == 'project-test1' + assert job_has_changes(builds[2], A, B) + assert builds[3].name == 'project-test2' + assert job_has_changes(builds[3], A, B) - assert jobs[4].name == 'project-test1' - assert job_has_changes(jobs[4], A, B, C) - assert jobs[5].name == 'project-test2' - assert job_has_changes(jobs[5], A, B, C) + assert builds[4].name == 'project-test1' + assert job_has_changes(builds[4], A, B, C) + assert builds[5].name == 'project-test2' + assert job_has_changes(builds[5], A, B, C) - self.fake_jenkins.hold_jobs_in_build = False - self.fake_jenkins.fakeRelease() + self.worker.hold_jobs_in_build = False + self.worker.release() self.waitUntilSettled() - assert len(jobs) == 0 + assert len(builds) == 0 - jobs = self.fake_jenkins.job_history - assert len(jobs) == 9 + assert len(history) == 9 assert A.data['status'] == 'MERGED' assert B.data['status'] == 'MERGED' assert C.data['status'] == 'MERGED' @@ -909,6 +1038,9 @@ class testScheduler(unittest.TestCase): def test_failed_changes(self): "Test that a change behind a failed change is retested" + builds = self.worker.running_builds + history = self.worker.build_history + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') A.addApproval('CRVW', 2) @@ -917,11 +1049,10 @@ class testScheduler(unittest.TestCase): self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) - self.fake_jenkins.fakeAddFailTest('project-test1', A) + self.worker.addFailTest('project-test1', A) self.waitUntilSettled() - jobs = self.fake_jenkins.job_history - assert len(jobs) > 6 + assert len(history) > 6 assert A.data['status'] == 'NEW' assert B.data['status'] == 'MERGED' assert A.reported == 2 @@ -930,7 +1061,10 @@ class testScheduler(unittest.TestCase): def test_independent_queues(self): "Test that changes end up in the right queues" - self.fake_jenkins.hold_jobs_in_build = True + builds = self.worker.running_builds + history = self.worker.build_history + + self.worker.hold_jobs_in_build = True A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B') C = self.fake_gerrit.addFakeChange('org/project2', 'master', 'C') @@ -942,33 +1076,31 @@ class testScheduler(unittest.TestCase): self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) - jobs = self.fake_jenkins.all_jobs self.waitUntilSettled() # There should be one merge job at the head of each queue running - assert len(jobs) == 2 - assert jobs[0].name == 'project-merge' - assert job_has_changes(jobs[0], A) - assert jobs[1].name == 'project1-merge' - assert job_has_changes(jobs[1], B) + assert len(builds) == 2 + assert builds[0].name == 'project-merge' + assert job_has_changes(builds[0], A) + assert builds[1].name == 'project1-merge' + assert job_has_changes(builds[1], B) - # Release the current merge jobs - self.fake_jenkins.fakeRelease('.*-merge') + # Release the current merge builds + self.worker.release('.*-merge') self.waitUntilSettled() # Release the merge job for project2 which is behind project1 - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - # All the test jobs should be running: + # All the test builds should be running: # project1 (3) + project2 (3) + project (2) = 8 - assert len(jobs) == 8 + assert len(builds) == 8 - self.fake_jenkins.fakeRelease() + self.worker.release() self.waitUntilSettled() - assert len(jobs) == 0 + assert len(builds) == 0 - jobs = self.fake_jenkins.job_history - assert len(jobs) == 11 + assert len(history) == 11 assert A.data['status'] == 'MERGED' assert B.data['status'] == 'MERGED' assert C.data['status'] == 'MERGED' @@ -979,8 +1111,10 @@ class testScheduler(unittest.TestCase): def test_failed_change_at_head(self): "Test that if a change at the head fails, jobs behind it are canceled" - self.fake_jenkins.hold_jobs_in_build = True + builds = self.worker.running_builds + history = self.worker.build_history + self.worker.hold_jobs_in_build = True A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') @@ -988,52 +1122,45 @@ class testScheduler(unittest.TestCase): B.addApproval('CRVW', 2) C.addApproval('CRVW', 2) - self.fake_jenkins.fakeAddFailTest('project-test1', A) + self.worker.addFailTest('project-test1', A) self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - finished_jobs = self.fake_jenkins.job_history - assert len(jobs) == 1 - assert jobs[0].name == 'project-merge' - assert job_has_changes(jobs[0], A) + assert len(builds) == 1 + assert builds[0].name == 'project-merge' + assert job_has_changes(builds[0], A) - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - assert len(jobs) == 6 - assert jobs[0].name == 'project-test1' - assert jobs[1].name == 'project-test2' - assert jobs[2].name == 'project-test1' - assert jobs[3].name == 'project-test2' - assert jobs[4].name == 'project-test1' - assert jobs[5].name == 'project-test2' + assert len(builds) == 6 + assert builds[0].name == 'project-test1' + assert builds[1].name == 'project-test2' + assert builds[2].name == 'project-test1' + assert builds[3].name == 'project-test2' + assert builds[4].name == 'project-test1' + assert builds[5].name == 'project-test2' - jobs[0].release() + self.release(builds[0]) self.waitUntilSettled() - assert len(jobs) == 2 # project-test2, project-merge for B - assert self.countJobResults(finished_jobs, 'ABORTED') == 4 + assert len(builds) == 2 # project-test2, project-merge for B + assert self.countJobResults(history, 'ABORTED') == 4 - self.fake_jenkins.hold_jobs_in_build = False - self.fake_jenkins.fakeRelease() + self.worker.hold_jobs_in_build = False + self.worker.release() self.waitUntilSettled() - for x in jobs: - print x - for x in finished_jobs: - print x - - assert len(jobs) == 0 - assert len(finished_jobs) == 15 + assert len(builds) == 0 + assert len(history) == 15 assert A.data['status'] == 'NEW' assert B.data['status'] == 'MERGED' assert C.data['status'] == 'MERGED' @@ -1044,8 +1171,11 @@ class testScheduler(unittest.TestCase): def test_failed_change_at_head_with_queue(self): "Test that if a change at the head fails, queued jobs are canceled" - self.fake_jenkins.hold_jobs_in_queue = True + builds = self.worker.running_builds + history = self.worker.build_history + queue = self.gearman_server.queue + self.gearman_server.hold_jobs_in_queue = True A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') @@ -1053,51 +1183,47 @@ class testScheduler(unittest.TestCase): B.addApproval('CRVW', 2) C.addApproval('CRVW', 2) - self.fake_jenkins.fakeAddFailTest('project-test1', A) + self.worker.addFailTest('project-test1', A) self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - finished_jobs = self.fake_jenkins.job_history - queue = self.fake_jenkins.queue - - assert len(jobs) == 1 + assert len(builds) == 0 assert len(queue) == 1 - assert jobs[0].name == 'project-merge' - assert job_has_changes(jobs[0], A) + assert queue[0].name == 'build:project-merge' + assert job_has_changes(queue[0], A) - self.fake_jenkins.fakeRelease('.*-merge') + self.gearman_server.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.gearman_server.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.gearman_server.release('.*-merge') self.waitUntilSettled() - assert len(jobs) == 6 + assert len(builds) == 0 assert len(queue) == 6 - assert jobs[0].name == 'project-test1' - assert jobs[1].name == 'project-test2' - assert jobs[2].name == 'project-test1' - assert jobs[3].name == 'project-test2' - assert jobs[4].name == 'project-test1' - assert jobs[5].name == 'project-test2' + assert queue[0].name == 'build:project-test1' + assert queue[1].name == 'build:project-test2' + assert queue[2].name == 'build:project-test1' + assert queue[3].name == 'build:project-test2' + assert queue[4].name == 'build:project-test1' + assert queue[5].name == 'build:project-test2' - jobs[0].release() + self.release(queue[0]) self.waitUntilSettled() - assert len(jobs) == 2 # project-test2, project-merge for B - assert len(queue) == 2 - assert self.countJobResults(finished_jobs, 'ABORTED') == 0 + assert len(builds) == 0 + assert len(queue) == 2 # project-test2, project-merge for B + assert self.countJobResults(history, 'ABORTED') == 0 - self.fake_jenkins.hold_jobs_in_queue = False - self.fake_jenkins.fakeRelease() + self.gearman_server.hold_jobs_in_queue = False + self.gearman_server.release() self.waitUntilSettled() - assert len(jobs) == 0 - assert len(finished_jobs) == 11 + assert len(builds) == 0 + assert len(history) == 11 assert A.data['status'] == 'NEW' assert B.data['status'] == 'MERGED' assert C.data['status'] == 'MERGED' @@ -1170,7 +1296,11 @@ class testScheduler(unittest.TestCase): def test_build_configuration(self): "Test that zuul merges the right commits for testing" - self.fake_jenkins.hold_jobs_in_queue = True + builds = self.worker.running_builds + history = self.worker.build_history + queue = self.gearman_server.queue + + self.gearman_server.hold_jobs_in_queue = True A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') @@ -1182,18 +1312,16 @@ class testScheduler(unittest.TestCase): self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - - self.fake_jenkins.fakeRelease('.*-merge') + self.gearman_server.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.gearman_server.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.gearman_server.release('.*-merge') self.waitUntilSettled() - ref = jobs[-1].parameters['ZUUL_REF'] - self.fake_jenkins.hold_jobs_in_queue = False - self.fake_jenkins.fakeRelease() + ref = self.getParameter(queue[-1], 'ZUUL_REF') + self.gearman_server.hold_jobs_in_queue = False + self.gearman_server.release() self.waitUntilSettled() path = os.path.join(GIT_ROOT, "org/project") @@ -1206,7 +1334,11 @@ class testScheduler(unittest.TestCase): def test_build_configuration_conflict(self): "Test that merge conflicts are handled" - self.fake_jenkins.hold_jobs_in_queue = True + builds = self.worker.running_builds + history = self.worker.build_history + queue = self.gearman_server.queue + + self.gearman_server.hold_jobs_in_queue = True A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') A.addPatchset(['conflict']) B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') @@ -1220,17 +1352,15 @@ class testScheduler(unittest.TestCase): self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - - self.fake_jenkins.fakeRelease('.*-merge') + self.gearman_server.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.gearman_server.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.gearman_server.release('.*-merge') self.waitUntilSettled() - ref = jobs[-1].parameters['ZUUL_REF'] - self.fake_jenkins.hold_jobs_in_queue = False - self.fake_jenkins.fakeRelease() + ref = self.getParameter(queue[-1], 'ZUUL_REF') + self.gearman_server.hold_jobs_in_queue = False + self.gearman_server.release() self.waitUntilSettled() assert A.data['status'] == 'MERGED' @@ -1243,6 +1373,9 @@ class testScheduler(unittest.TestCase): def test_post(self): "Test that post jobs run" + builds = self.worker.running_builds + history = self.worker.build_history + e = { "type": "ref-updated", "submitter": { @@ -1258,15 +1391,18 @@ class testScheduler(unittest.TestCase): self.fake_gerrit.addEvent(e) self.waitUntilSettled() - jobs = self.fake_jenkins.job_history - job_names = [x.name for x in jobs] - assert len(jobs) == 1 + job_names = [x.name for x in history] + assert len(history) == 1 assert 'project-post' in job_names self.assertEmptyQueues() def test_build_configuration_branch(self): "Test that the right commits are on alternate branches" - self.fake_jenkins.hold_jobs_in_queue = True + builds = self.worker.running_builds + history = self.worker.build_history + queue = self.gearman_server.queue + + self.gearman_server.hold_jobs_in_queue = True A = self.fake_gerrit.addFakeChange('org/project', 'mp', 'A') B = self.fake_gerrit.addFakeChange('org/project', 'mp', 'B') C = self.fake_gerrit.addFakeChange('org/project', 'mp', 'C') @@ -1278,17 +1414,15 @@ class testScheduler(unittest.TestCase): self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - - self.fake_jenkins.fakeRelease('.*-merge') + self.gearman_server.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.gearman_server.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.gearman_server.release('.*-merge') self.waitUntilSettled() - ref = jobs[-1].parameters['ZUUL_REF'] - self.fake_jenkins.hold_jobs_in_queue = False - self.fake_jenkins.fakeRelease() + ref = self.getParameter(queue[-1], 'ZUUL_REF') + self.gearman_server.hold_jobs_in_queue = False + self.gearman_server.release() self.waitUntilSettled() path = os.path.join(GIT_ROOT, "org/project") @@ -1312,7 +1446,11 @@ class testScheduler(unittest.TestCase): def test_build_configuration_multi_branch(self): "Test that dependent changes on multiple branches are merged" - self.fake_jenkins.hold_jobs_in_queue = True + builds = self.worker.running_builds + history = self.worker.build_history + queue = self.gearman_server.queue + + self.gearman_server.hold_jobs_in_queue = True A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project', 'mp', 'B') C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') @@ -1324,18 +1462,16 @@ class testScheduler(unittest.TestCase): self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - - self.fake_jenkins.fakeRelease('.*-merge') + self.gearman_server.release('.*-merge') self.waitUntilSettled() - ref_mp = jobs[-1].parameters['ZUUL_REF'] - self.fake_jenkins.fakeRelease('.*-merge') + ref_mp = self.getParameter(queue[-1], 'ZUUL_REF') + self.gearman_server.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.gearman_server.release('.*-merge') self.waitUntilSettled() - ref_master = jobs[-1].parameters['ZUUL_REF'] - self.fake_jenkins.hold_jobs_in_queue = False - self.fake_jenkins.fakeRelease() + ref_master = self.getParameter(queue[-1], 'ZUUL_REF') + self.gearman_server.hold_jobs_in_queue = False + self.gearman_server.release() self.waitUntilSettled() path = os.path.join(GIT_ROOT, "org/project") @@ -1366,9 +1502,6 @@ class testScheduler(unittest.TestCase): self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - finished_jobs = self.fake_jenkins.job_history - assert A.data['status'] == 'MERGED' assert A.reported == 2 assert B.data['status'] == 'MERGED' @@ -1377,20 +1510,25 @@ class testScheduler(unittest.TestCase): def test_job_from_templates_launched(self): "Test whether a job generated via a template can be launched" + builds = self.worker.running_builds + history = self.worker.build_history + A = self.fake_gerrit.addFakeChange( 'org/templated-project', 'master', 'A') self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - jobs = self.fake_jenkins.job_history - job_names = [x.name for x in jobs] + job_names = [x.name for x in history] assert 'project-test1' in job_names assert 'project-test2' in job_names - assert jobs[0].result == 'SUCCESS' - assert jobs[1].result == 'SUCCESS' + assert history[0].result == 'SUCCESS' + assert history[1].result == 'SUCCESS' def test_dependent_changes_dequeue(self): "Test that dependent patches are not needlessly tested" + builds = self.worker.running_builds + history = self.worker.build_history + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') @@ -1407,7 +1545,7 @@ class testScheduler(unittest.TestCase): B.setDependsOn(A, 1) A.setDependsOn(M1, 1) - self.fake_jenkins.fakeAddFailTest('project-merge', A) + self.worker.addFailTest('project-merge', A) self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) @@ -1415,29 +1553,23 @@ class testScheduler(unittest.TestCase): self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - finished_jobs = self.fake_jenkins.job_history - - for x in jobs: - print x - for x in finished_jobs: - print x - assert A.data['status'] == 'NEW' assert A.reported == 2 assert B.data['status'] == 'NEW' assert B.reported == 2 assert C.data['status'] == 'NEW' assert C.reported == 2 - assert len(finished_jobs) == 1 + assert len(history) == 1 self.assertEmptyQueues() def test_head_is_dequeued_once(self): "Test that if a change at the head fails it is dequeued only once" # If it's dequeued more than once, we should see extra # aborted jobs. - self.fake_jenkins.hold_jobs_in_build = True + builds = self.worker.running_builds + history = self.worker.build_history + self.worker.hold_jobs_in_build = True A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B') C = self.fake_gerrit.addFakeChange('org/project1', 'master', 'C') @@ -1445,52 +1577,50 @@ class testScheduler(unittest.TestCase): B.addApproval('CRVW', 2) C.addApproval('CRVW', 2) - self.fake_jenkins.fakeAddFailTest('project1-test1', A) - self.fake_jenkins.fakeAddFailTest('project1-test2', A) - self.fake_jenkins.fakeAddFailTest('project1-project2-integration', A) + self.worker.addFailTest('project1-test1', A) + self.worker.addFailTest('project1-test2', A) + self.worker.addFailTest('project1-project2-integration', A) self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - finished_jobs = self.fake_jenkins.job_history - assert len(jobs) == 1 - assert jobs[0].name == 'project1-merge' - assert job_has_changes(jobs[0], A) + assert len(builds) == 1 + assert builds[0].name == 'project1-merge' + assert job_has_changes(builds[0], A) - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - assert len(jobs) == 9 - assert jobs[0].name == 'project1-test1' - assert jobs[1].name == 'project1-test2' - assert jobs[2].name == 'project1-project2-integration' - assert jobs[3].name == 'project1-test1' - assert jobs[4].name == 'project1-test2' - assert jobs[5].name == 'project1-project2-integration' - assert jobs[6].name == 'project1-test1' - assert jobs[7].name == 'project1-test2' - assert jobs[8].name == 'project1-project2-integration' + assert len(builds) == 9 + assert builds[0].name == 'project1-test1' + assert builds[1].name == 'project1-test2' + assert builds[2].name == 'project1-project2-integration' + assert builds[3].name == 'project1-test1' + assert builds[4].name == 'project1-test2' + assert builds[5].name == 'project1-project2-integration' + assert builds[6].name == 'project1-test1' + assert builds[7].name == 'project1-test2' + assert builds[8].name == 'project1-project2-integration' - jobs[0].release() + self.release(builds[0]) self.waitUntilSettled() - assert len(jobs) == 3 # test2, integration, merge for B - assert self.countJobResults(finished_jobs, 'ABORTED') == 6 + assert len(builds) == 3 # test2, integration, merge for B + assert self.countJobResults(history, 'ABORTED') == 6 - self.fake_jenkins.hold_jobs_in_build = False - self.fake_jenkins.fakeRelease() + self.worker.hold_jobs_in_build = False + self.worker.release() self.waitUntilSettled() - assert len(jobs) == 0 - assert len(finished_jobs) == 20 + assert len(builds) == 0 + assert len(history) == 20 assert A.data['status'] == 'NEW' assert B.data['status'] == 'MERGED' @@ -1502,62 +1632,68 @@ class testScheduler(unittest.TestCase): def test_nonvoting_job(self): "Test that non-voting jobs don't vote." + builds = self.worker.running_builds + history = self.worker.build_history + A = self.fake_gerrit.addFakeChange('org/nonvoting-project', 'master', 'A') A.addApproval('CRVW', 2) - self.fake_jenkins.fakeAddFailTest('nonvoting-project-test2', A) + self.worker.addFailTest('nonvoting-project-test2', A) self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - finished_jobs = self.fake_jenkins.job_history assert A.data['status'] == 'MERGED' assert A.reported == 2 - assert finished_jobs[0].result == 'SUCCESS' - assert finished_jobs[1].result == 'SUCCESS' - assert finished_jobs[2].result == 'FAILURE' + assert history[0].result == 'SUCCESS' + assert history[1].result == 'SUCCESS' + assert history[2].result == 'FAILURE' self.assertEmptyQueues() def test_check_queue_success(self): "Test successful check queue jobs." + builds = self.worker.running_builds + history = self.worker.build_history + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - finished_jobs = self.fake_jenkins.job_history assert A.data['status'] == 'NEW' assert A.reported == 1 - assert finished_jobs[0].result == 'SUCCESS' - assert finished_jobs[1].result == 'SUCCESS' - assert finished_jobs[2].result == 'SUCCESS' + assert history[0].result == 'SUCCESS' + assert history[1].result == 'SUCCESS' + assert history[2].result == 'SUCCESS' self.assertEmptyQueues() def test_check_queue_failure(self): "Test failed check queue jobs." + builds = self.worker.running_builds + history = self.worker.build_history + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') - self.fake_jenkins.fakeAddFailTest('project-test2', A) + self.worker.addFailTest('project-test2', A) self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - finished_jobs = self.fake_jenkins.job_history assert A.data['status'] == 'NEW' assert A.reported == 1 - assert finished_jobs[0].result == 'SUCCESS' - assert finished_jobs[1].result == 'SUCCESS' - assert finished_jobs[2].result == 'FAILURE' + assert history[0].result == 'SUCCESS' + assert history[1].result == 'SUCCESS' + assert history[2].result == 'FAILURE' self.assertEmptyQueues() def test_dependent_behind_dequeue(self): "test that dependent changes behind dequeued changes work" # This complicated test is a reproduction of a real life bug self.sched.reconfigure(self.config) - self.fake_jenkins.hold_jobs_in_build = True + builds = self.worker.running_builds + history = self.worker.build_history + + self.worker.hold_jobs_in_build = True A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B') C = self.fake_gerrit.addFakeChange('org/project2', 'master', 'C') @@ -1574,8 +1710,6 @@ class testScheduler(unittest.TestCase): F.addApproval('CRVW', 2) A.fail_merge = True - jobs = self.fake_jenkins.all_jobs - finished_jobs = self.fake_jenkins.job_history # Change object re-use in the gerrit trigger is hidden if # changes are added in quick succession; waiting makes it more @@ -1585,9 +1719,9 @@ class testScheduler(unittest.TestCase): self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() self.fake_gerrit.addEvent(C.addApproval('APRV', 1)) @@ -1599,38 +1733,30 @@ class testScheduler(unittest.TestCase): self.fake_gerrit.addEvent(F.addApproval('APRV', 1)) self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - for x in jobs: - print x # all jobs running # Grab pointers to the jobs we want to release before # releasing any, because list indexes may change as # the jobs complete. - a, b, c = jobs[:3] + a, b, c = builds[:3] a.release() b.release() c.release() self.waitUntilSettled() - self.fake_jenkins.hold_jobs_in_build = False - self.fake_jenkins.fakeRelease() + self.worker.hold_jobs_in_build = False + self.worker.release() self.waitUntilSettled() - for x in jobs: - print x - for x in finished_jobs: - print x - print self.sched.formatStatusHTML() - assert A.data['status'] == 'NEW' assert B.data['status'] == 'MERGED' assert C.data['status'] == 'MERGED' @@ -1645,24 +1771,26 @@ class testScheduler(unittest.TestCase): assert E.reported == 2 assert F.reported == 2 - assert self.countJobResults(finished_jobs, 'ABORTED') == 15 - assert len(finished_jobs) == 44 + assert self.countJobResults(history, 'ABORTED') == 15 + assert len(history) == 44 self.assertEmptyQueues() def test_merger_repack(self): "Test that the merger works after a repack" + builds = self.worker.running_builds + history = self.worker.build_history + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') A.addApproval('CRVW', 2) self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) self.waitUntilSettled() - jobs = self.fake_jenkins.job_history - job_names = [x.name for x in jobs] + job_names = [x.name for x in history] assert 'project-merge' in job_names assert 'project-test1' in job_names assert 'project-test2' in job_names - assert jobs[0].result == 'SUCCESS' - assert jobs[1].result == 'SUCCESS' - assert jobs[2].result == 'SUCCESS' + assert history[0].result == 'SUCCESS' + assert history[1].result == 'SUCCESS' + assert history[2].result == 'SUCCESS' assert A.data['status'] == 'MERGED' assert A.reported == 2 self.assertEmptyQueues() @@ -1674,14 +1802,13 @@ class testScheduler(unittest.TestCase): A.addApproval('CRVW', 2) self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) self.waitUntilSettled() - jobs = self.fake_jenkins.job_history - job_names = [x.name for x in jobs] + job_names = [x.name for x in history] assert 'project-merge' in job_names assert 'project-test1' in job_names assert 'project-test2' in job_names - assert jobs[0].result == 'SUCCESS' - assert jobs[1].result == 'SUCCESS' - assert jobs[2].result == 'SUCCESS' + assert history[0].result == 'SUCCESS' + assert history[1].result == 'SUCCESS' + assert history[2].result == 'SUCCESS' assert A.data['status'] == 'MERGED' assert A.reported == 2 self.assertEmptyQueues() @@ -1689,6 +1816,9 @@ class testScheduler(unittest.TestCase): def test_merger_repack_large_change(self): "Test that the merger works with large changes after a repack" # https://bugs.launchpad.net/zuul/+bug/1078946 + builds = self.worker.running_builds + history = self.worker.build_history + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') A.addPatchset(large=True) path = os.path.join(UPSTREAM_ROOT, "org/project1") @@ -1699,22 +1829,25 @@ class testScheduler(unittest.TestCase): A.addApproval('CRVW', 2) self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) self.waitUntilSettled() - jobs = self.fake_jenkins.job_history - job_names = [x.name for x in jobs] + job_names = [x.name for x in history] assert 'project1-merge' in job_names assert 'project1-test1' in job_names assert 'project1-test2' in job_names - assert jobs[0].result == 'SUCCESS' - assert jobs[1].result == 'SUCCESS' - assert jobs[2].result == 'SUCCESS' + assert history[0].result == 'SUCCESS' + assert history[1].result == 'SUCCESS' + assert history[2].result == 'SUCCESS' assert A.data['status'] == 'MERGED' assert A.reported == 2 self.assertEmptyQueues() def test_nonexistent_job(self): "Test launching a job that doesn't exist" - self.fake_jenkins.nonexistent_jobs.append('project-merge') - self.jenkins.launch_retry_timeout = 0.1 + builds = self.worker.running_builds + history = self.worker.build_history + + # Set to the state immediately after a restart + self.resetGearmanServer() + self.launcher.negative_function_cache_ttl = 0 A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') A.addApproval('CRVW', 2) @@ -1722,35 +1855,33 @@ class testScheduler(unittest.TestCase): # There may be a thread about to report a lost change while A.reported < 2: self.waitUntilSettled() - jobs = self.fake_jenkins.job_history - job_names = [x.name for x in jobs] + job_names = [x.name for x in history] assert not job_names assert A.data['status'] == 'NEW' assert A.reported == 2 self.assertEmptyQueues() # Make sure things still work: - self.fake_jenkins.nonexistent_jobs = [] + self.registerJobs() A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') A.addApproval('CRVW', 2) self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) self.waitUntilSettled() - jobs = self.fake_jenkins.job_history - job_names = [x.name for x in jobs] + job_names = [x.name for x in history] assert 'project-merge' in job_names assert 'project-test1' in job_names assert 'project-test2' in job_names - assert jobs[0].result == 'SUCCESS' - assert jobs[1].result == 'SUCCESS' - assert jobs[2].result == 'SUCCESS' + assert history[0].result == 'SUCCESS' + assert history[1].result == 'SUCCESS' + assert history[2].result == 'SUCCESS' assert A.data['status'] == 'MERGED' assert A.reported == 2 self.assertEmptyQueues() def test_single_nonexistent_post_job(self): "Test launching a single post job that doesn't exist" - self.fake_jenkins.nonexistent_jobs.append('project-post') - self.jenkins.launch_retry_timeout = 0.1 + builds = self.worker.running_builds + history = self.worker.build_history e = { "type": "ref-updated", @@ -1764,18 +1895,23 @@ class testScheduler(unittest.TestCase): "project": "org/project", } } + # Set to the state immediately after a restart + self.resetGearmanServer() + self.launcher.negative_function_cache_ttl = 0 + self.fake_gerrit.addEvent(e) self.waitUntilSettled() - jobs = self.fake_jenkins.job_history - assert len(jobs) == 0 + assert len(history) == 0 self.assertEmptyQueues() def test_new_patchset_dequeues_old(self): "Test that a new patchset causes the old to be dequeued" # D -> C (depends on B) -> B (depends on A) -> A -> M - self.fake_jenkins.hold_jobs_in_build = True + builds = self.worker.running_builds + history = self.worker.build_history + self.worker.hold_jobs_in_build = True M = self.fake_gerrit.addFakeChange('org/project', 'master', 'M') M.setMerged() @@ -1802,18 +1938,10 @@ class testScheduler(unittest.TestCase): self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2)) self.waitUntilSettled() - self.fake_jenkins.hold_jobs_in_build = False - self.fake_jenkins.fakeRelease() + self.worker.hold_jobs_in_build = False + self.worker.release() self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - finished_jobs = self.fake_jenkins.job_history - - for x in jobs: - print x - for x in finished_jobs: - print x - assert A.data['status'] == 'MERGED' assert A.reported == 2 assert B.data['status'] == 'NEW' @@ -1822,14 +1950,16 @@ class testScheduler(unittest.TestCase): assert C.reported == 2 assert D.data['status'] == 'MERGED' assert D.reported == 2 - assert len(finished_jobs) == 9 # 3 each for A, B, D. + assert len(history) == 9 # 3 each for A, B, D. self.assertEmptyQueues() def test_new_patchset_dequeues_old_on_head(self): "Test that a new patchset causes the old to be dequeued (at head)" # D -> C (depends on B) -> B (depends on A) -> A -> M - self.fake_jenkins.hold_jobs_in_build = True + builds = self.worker.running_builds + history = self.worker.build_history + self.worker.hold_jobs_in_build = True M = self.fake_gerrit.addFakeChange('org/project', 'master', 'M') M.setMerged() A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') @@ -1855,18 +1985,10 @@ class testScheduler(unittest.TestCase): self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(2)) self.waitUntilSettled() - self.fake_jenkins.hold_jobs_in_build = False - self.fake_jenkins.fakeRelease() + self.worker.hold_jobs_in_build = False + self.worker.release() self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - finished_jobs = self.fake_jenkins.job_history - - for x in jobs: - print x - for x in finished_jobs: - print x - assert A.data['status'] == 'NEW' assert A.reported == 2 assert B.data['status'] == 'NEW' @@ -1875,13 +1997,15 @@ class testScheduler(unittest.TestCase): assert C.reported == 2 assert D.data['status'] == 'MERGED' assert D.reported == 2 - assert len(finished_jobs) == 7 + assert len(history) == 7 self.assertEmptyQueues() def test_new_patchset_dequeues_old_without_dependents(self): "Test that a new patchset causes only the old to be dequeued" - self.fake_jenkins.hold_jobs_in_build = True + builds = self.worker.running_builds + history = self.worker.build_history + self.worker.hold_jobs_in_build = True A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') @@ -1898,31 +2022,25 @@ class testScheduler(unittest.TestCase): self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2)) self.waitUntilSettled() - self.fake_jenkins.hold_jobs_in_build = False - self.fake_jenkins.fakeRelease() + self.worker.hold_jobs_in_build = False + self.worker.release() self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - finished_jobs = self.fake_jenkins.job_history - - for x in jobs: - print x - for x in finished_jobs: - print x - assert A.data['status'] == 'MERGED' assert A.reported == 2 assert B.data['status'] == 'NEW' assert B.reported == 2 assert C.data['status'] == 'MERGED' assert C.reported == 2 - assert len(finished_jobs) == 9 + assert len(history) == 9 self.assertEmptyQueues() def test_new_patchset_dequeues_old_independent_queue(self): "Test that a new patchset causes the old to be dequeued (independent)" - self.fake_jenkins.hold_jobs_in_build = True + builds = self.worker.running_builds + history = self.worker.build_history + self.worker.hold_jobs_in_build = True A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C') @@ -1935,32 +2053,26 @@ class testScheduler(unittest.TestCase): self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2)) self.waitUntilSettled() - self.fake_jenkins.hold_jobs_in_build = False - self.fake_jenkins.fakeRelease() + self.worker.hold_jobs_in_build = False + self.worker.release() self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - finished_jobs = self.fake_jenkins.job_history - - for x in jobs: - print x - for x in finished_jobs: - print x - assert A.data['status'] == 'NEW' assert A.reported == 1 assert B.data['status'] == 'NEW' assert B.reported == 1 assert C.data['status'] == 'NEW' assert C.reported == 1 - assert len(finished_jobs) == 10 - assert self.countJobResults(finished_jobs, 'ABORTED') == 1 + assert len(history) == 10 + assert self.countJobResults(history, 'ABORTED') == 1 self.assertEmptyQueues() def test_zuul_refs(self): "Test that zuul refs exist and have the right changes" - self.fake_jenkins.hold_jobs_in_build = True + builds = self.worker.running_builds + history = self.worker.build_history + self.worker.hold_jobs_in_build = True M1 = self.fake_gerrit.addFakeChange('org/project1', 'master', 'M1') M1.setMerged() M2 = self.fake_gerrit.addFakeChange('org/project2', 'master', 'M2') @@ -1980,20 +2092,17 @@ class testScheduler(unittest.TestCase): self.fake_gerrit.addEvent(D.addApproval('APRV', 1)) self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - self.fake_jenkins.fakeRelease('.*-merge') + self.worker.release('.*-merge') self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - finished_jobs = self.fake_jenkins.job_history - a_zref = b_zref = c_zref = d_zref = None - for x in jobs: + for x in builds: if x.parameters['ZUUL_CHANGE'] == '3': a_zref = x.parameters['ZUUL_REF'] if x.parameters['ZUUL_CHANGE'] == '4': @@ -2035,8 +2144,8 @@ class testScheduler(unittest.TestCase): assert ref_has_change(d_zref, C) assert ref_has_change(d_zref, D) - self.fake_jenkins.hold_jobs_in_build = False - self.fake_jenkins.fakeRelease() + self.worker.hold_jobs_in_build = False + self.worker.release() self.waitUntilSettled() assert A.data['status'] == 'MERGED' @@ -2062,6 +2171,9 @@ class testScheduler(unittest.TestCase): def test_file_jobs(self): "Test that file jobs run only when appropriate" + builds = self.worker.running_builds + history = self.worker.build_history + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') A.addPatchset(['pip-requires']) B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') @@ -2071,10 +2183,7 @@ class testScheduler(unittest.TestCase): self.fake_gerrit.addEvent(B.addApproval('APRV', 1)) self.waitUntilSettled() - jobs = self.fake_jenkins.all_jobs - finished_jobs = self.fake_jenkins.job_history - - testfile_jobs = [x for x in finished_jobs + testfile_jobs = [x for x in history if x.name == 'project-testfile'] assert len(testfile_jobs) == 1 @@ -2089,3 +2198,39 @@ class testScheduler(unittest.TestCase): "Test that we can test the config" sched = zuul.scheduler.Scheduler() sched.testConfig(CONFIG.get('zuul', 'layout_config')) + + def test_build_description(self): + "Test that build descriptions update" + builds = self.worker.running_builds + history = self.worker.build_history + + self.worker.registerFunction('set_description:' + + self.worker.worker_id) + + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + A.addApproval('CRVW', 2) + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.waitUntilSettled() + desc = history[0].description + self.log.debug("Description: %s" % desc) + assert re.search("Branch.*master", desc) + assert re.search("Pipeline.*gate", desc) + assert re.search("project-merge.*SUCCESS", desc) + assert re.search("project-test1.*SUCCESS", desc) + assert re.search("project-test2.*SUCCESS", desc) + assert re.search("Reported result.*SUCCESS", desc) + + def test_node_label(self): + "Test that a job runs on a specific node label" + builds = self.worker.running_builds + history = self.worker.build_history + + self.worker.registerFunction('build:node-project-test1:debian') + + A = self.fake_gerrit.addFakeChange('org/node-project', 'master', 'A') + A.addApproval('CRVW', 2) + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.waitUntilSettled() + assert history[0].node is None + assert history[1].node == 'debian' + assert history[2].node is None diff --git a/tools/pip-requires b/tools/pip-requires index 1ebfd9b7fa..0ef242ca53 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -9,3 +9,4 @@ python-daemon extras statsd>=1.0.0,<3.0 voluptuous>=0.6,<0.7 +http://tarballs.openstack.org/gear/gear-master.tar.gz#egg=gear diff --git a/tox.ini b/tox.ini index e8d5c1a275..0d4969351f 100644 --- a/tox.ini +++ b/tox.ini @@ -7,13 +7,14 @@ setenv = STATSD_HOST=localhost STATSD_PORT=8125 deps = -r{toxinidir}/tools/pip-requires -r{toxinidir}/tools/test-requires -commands = nosetests {posargs} +commands = nosetests --logging-format="%(asctime)s %(name)-32s %(levelname)-8s %(message)s" {posargs} [tox:jenkins] downloadcache = ~/cache/pip [testenv:pep8] deps = pep8==1.3.3 + -r{toxinidir}/tools/pip-requires commands = pep8 --ignore=E123,E125,E128 --repeat --show-source --exclude=.venv,.tox,dist,doc,build . [testenv:cover] @@ -21,6 +22,7 @@ setenv = NOSE_WITH_COVERAGE=1 [testenv:pyflakes] deps = pyflakes + -r{toxinidir}/tools/pip-requires commands = pyflakes zuul setup.py [testenv:venv] diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py index dae9bae1a9..26e496c802 100755 --- a/zuul/cmd/server.py +++ b/zuul/cmd/server.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python # Copyright 2012 Hewlett-Packard Development Company, L.P. # Copyright 2013 OpenStack Foundation # @@ -88,7 +89,7 @@ class Server(object): def test_config(self): # See comment at top of file about zuul imports import zuul.scheduler - import zuul.launcher.jenkins + import zuul.launcher.gearman import zuul.trigger.gerrit logging.basicConfig(level=logging.DEBUG) @@ -98,20 +99,24 @@ class Server(object): def main(self): # See comment at top of file about zuul imports import zuul.scheduler - import zuul.launcher.jenkins + import zuul.launcher.gearman import zuul.trigger.gerrit + import zuul.webapp self.sched = zuul.scheduler.Scheduler() - jenkins = zuul.launcher.jenkins.Jenkins(self.config, self.sched) + gearman = zuul.launcher.gearman.Gearman(self.config, self.sched) gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched) + webapp = zuul.webapp.WebApp(self.sched) - self.sched.setLauncher(jenkins) + self.sched.setLauncher(gearman) self.sched.setTrigger(gerrit) self.sched.start() self.sched.reconfigure(self.config) self.sched.resume() + webapp.start() + signal.signal(signal.SIGHUP, self.reconfigure_handler) signal.signal(signal.SIGUSR1, self.exit_handler) while True: @@ -168,3 +173,8 @@ def main(): with daemon.DaemonContext(pidfile=pid): server.setup_logging() server.main() + + +if __name__ == "__main__": + sys.path.insert(0, '.') + main() diff --git a/zuul/launcher/gearman.py b/zuul/launcher/gearman.py new file mode 100644 index 0000000000..5af6077559 --- /dev/null +++ b/zuul/launcher/gearman.py @@ -0,0 +1,387 @@ +# Copyright 2012 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import gear +import json +import logging +import time +import threading +from uuid import uuid4 + +from zuul.model import Build + + +class GearmanCleanup(threading.Thread): + """ A thread that checks to see if outstanding builds have + completed without reporting back. """ + log = logging.getLogger("zuul.JenkinsCleanup") + + def __init__(self, gearman): + threading.Thread.__init__(self) + self.gearman = gearman + self.wake_event = threading.Event() + self._stopped = False + + def stop(self): + self._stopped = True + self.wake_event.set() + + def run(self): + while True: + self.wake_event.wait(300) + if self._stopped: + return + try: + self.gearman.lookForLostBuilds() + except: + self.log.exception("Exception checking builds:") + + +def getJobData(job): + if not len(job.data): + return {} + d = job.data[-1] + if not d: + return {} + return json.loads(d) + + +class ZuulGearmanClient(gear.Client): + def __init__(self, zuul_gearman): + super(ZuulGearmanClient, self).__init__() + self.__zuul_gearman = zuul_gearman + + def handleWorkComplete(self, packet): + job = super(ZuulGearmanClient, self).handleWorkComplete(packet) + self.__zuul_gearman.onBuildCompleted(job) + return job + + def handleWorkFail(self, packet): + job = super(ZuulGearmanClient, self).handleWorkFail(packet) + self.__zuul_gearman.onBuildCompleted(job) + return job + + def handleWorkStatus(self, packet): + job = super(ZuulGearmanClient, self).handleWorkStatus(packet) + self.__zuul_gearman.onWorkStatus(job) + return job + + def handleWorkData(self, packet): + job = super(ZuulGearmanClient, self).handleWorkData(packet) + self.__zuul_gearman.onWorkStatus(job) + return job + + def handleDisconnect(self, job): + job = super(ZuulGearmanClient, self).handleDisconnect(job) + self.__zuul_gearman.onDisconnect(job) + + def handleStatusRes(self, packet): + try: + job = super(ZuulGearmanClient, self).handleStatusRes(packet) + except gear.UnknownJobError: + handle = packet.getArgument(0) + for build in self.__zuul_gearman.builds: + if build.__gearman_job.handle == handle: + self.__zuul_gearman.onUnknownJob(job) + + +class Gearman(object): + log = logging.getLogger("zuul.Gearman") + negative_function_cache_ttl = 5 + + def __init__(self, config, sched): + self.sched = sched + self.builds = {} + self.meta_jobs = {} # A list of meta-jobs like stop or describe + server = config.get('gearman', 'server') + if config.has_option('gearman', 'port'): + port = config.get('gearman', 'port') + else: + port = 4730 + + self.gearman = ZuulGearmanClient(self) + self.gearman.addServer(server, port) + + self.cleanup_thread = GearmanCleanup(self) + self.cleanup_thread.start() + self.function_cache = set() + self.function_cache_time = 0 + + def stop(self): + self.log.debug("Stopping") + self.cleanup_thread.stop() + self.cleanup_thread.join() + self.gearman.shutdown() + self.log.debug("Stopped") + + def isJobRegistered(self, name): + if self.function_cache_time: + for connection in self.gearman.active_connections: + if connection.connect_time > self.function_cache_time: + self.function_cache = set() + self.function_cache_time = 0 + break + if name in self.function_cache: + self.log.debug("Function %s is registered" % name) + return True + if ((time.time() - self.function_cache_time) < + self.negative_function_cache_ttl): + self.log.debug("Function %s is not registered " + "(negative ttl in effect)" % name) + return False + self.function_cache_time = time.time() + for connection in self.gearman.active_connections: + try: + req = gear.StatusAdminRequest() + connection.sendAdminRequest(req) + req.waitForResponse() + except Exception: + self.log.exception("Exception while checking functions") + continue + for line in req.response.split('\n'): + parts = [x.strip() for x in line.split()] + if not parts or parts[0] == '.': + continue + self.function_cache.add(parts[0]) + if name in self.function_cache: + self.log.debug("Function %s is registered" % name) + return True + self.log.debug("Function %s is not registered" % name) + return False + + def launch(self, job, change, pipeline, dependent_changes=[]): + self.log.info("Launch job %s for change %s with dependent changes %s" % + (job, change, dependent_changes)) + dependent_changes = dependent_changes[:] + dependent_changes.reverse() + uuid = str(uuid4().hex) + params = dict(ZUUL_UUID=uuid, + ZUUL_PROJECT=change.project.name) + params['ZUUL_PIPELINE'] = pipeline.name + if hasattr(change, 'refspec'): + changes_str = '^'.join( + ['%s:%s:%s' % (c.project.name, c.branch, c.refspec) + for c in dependent_changes + [change]]) + params['ZUUL_BRANCH'] = change.branch + params['ZUUL_CHANGES'] = changes_str + params['ZUUL_REF'] = ('refs/zuul/%s/%s' % + (change.branch, + change.current_build_set.ref)) + params['ZUUL_COMMIT'] = change.current_build_set.commit + + zuul_changes = ' '.join(['%s,%s' % (c.number, c.patchset) + for c in dependent_changes + [change]]) + params['ZUUL_CHANGE_IDS'] = zuul_changes + params['ZUUL_CHANGE'] = str(change.number) + params['ZUUL_PATCHSET'] = str(change.patchset) + if hasattr(change, 'ref'): + params['ZUUL_REFNAME'] = change.ref + params['ZUUL_OLDREV'] = change.oldrev + params['ZUUL_NEWREV'] = change.newrev + params['ZUUL_SHORT_OLDREV'] = change.oldrev[:7] + params['ZUUL_SHORT_NEWREV'] = change.newrev[:7] + + params['ZUUL_REF'] = change.ref + params['ZUUL_COMMIT'] = change.newrev + + # This is what we should be heading toward for parameters: + + # required: + # ZUUL_UUID + # ZUUL_REF (/refs/zuul/..., /refs/tags/foo, master) + # ZUUL_COMMIT + + # optional: + # ZUUL_PROJECT + # ZUUL_PIPELINE + + # optional (changes only): + # ZUUL_BRANCH + # ZUUL_CHANGE + # ZUUL_CHANGE_IDS + # ZUUL_PATCHSET + + # optional (ref updated only): + # ZUUL_OLDREV + # ZUUL_NEWREV + # ZUUL_SHORT_NEWREV + # ZUUL_SHORT_OLDREV + + if callable(job.parameter_function): + job.parameter_function(change, params) + self.log.debug("Custom parameter function used for job %s, " + "change: %s, params: %s" % (job, change, params)) + + if 'ZUUL_NODE' in params: + name = "build:%s:%s" % (job.name, params['ZUUL_NODE']) + else: + name = "build:%s" % job.name + build = Build(job, uuid) + + gearman_job = gear.Job(name, json.dumps(params), + unique=uuid) + build.__gearman_job = gearman_job + self.builds[uuid] = build + + if not self.isJobRegistered(gearman_job.name): + self.log.error("Job %s is not registered with Gearman" % + gearman_job) + self.onBuildCompleted(gearman_job, 'LOST') + return build + + try: + self.gearman.submitJob(gearman_job) + except Exception: + self.log.exception("Unable to submit job to Gearman") + self.onBuildCompleted(gearman_job, 'LOST') + return build + + gearman_job.waitForHandle(30) + if not gearman_job.handle: + self.log.error("No job handle was received for %s after 30 seconds" + " marking as lost." % + gearman_job) + self.onBuildCompleted(gearman_job, 'LOST') + + return build + + def cancel(self, build): + self.log.info("Cancel build %s for job %s" % (build, build.job)) + + if build.number: + self.log.debug("Build %s has already started" % build) + self.cancelRunningBuild(build) + self.log.debug("Canceled running build %s" % build) + return + else: + self.log.debug("Build %s has not started yet" % build) + + self.log.debug("Looking for build %s in queue" % build) + if self.cancelJobInQueue(build): + self.log.debug("Removed build %s from queue" % build) + return + + self.log.debug("Still unable to find build %s to cancel" % build) + if build.number: + self.log.debug("Build %s has just started" % build) + self.cancelRunningBuild(build) + self.log.debug("Canceled just running build %s" % build) + else: + self.log.error("Build %s has not started but " + "was not found in queue" % build) + + def onBuildCompleted(self, job, result=None): + if job.unique in self.meta_jobs: + del self.meta_jobs[job.unique] + return + + build = self.builds.get(job.unique) + if build: + if result is None: + data = getJobData(job) + result = data.get('result') + self.log.info("Build %s complete, result %s" % + (job, result)) + build.result = result + self.sched.onBuildCompleted(build) + # The test suite expects the build to be removed from the + # internal dict after it's added to the report queue. + del self.builds[job.unique] + else: + if not job.name.startswith("stop:"): + self.log.error("Unable to find build %s" % job.unique) + + def onWorkStatus(self, job): + data = getJobData(job) + self.log.info("Build %s update" % job) + build = self.builds.get(job.unique) + if build: + self.log.debug("Found build %s" % build) + if not build.number: + self.log.info("Build %s started" % job) + build.url = data.get('full_url') + build.number = data.get('number') + build.__gearman_master = data.get('master') + self.sched.onBuildStarted(build) + build.fraction_complete = job.fraction_complete + else: + self.log.error("Unable to find build %s" % job.unique) + + def onDisconnect(self, job): + self.log.info("Gearman job %s lost due to disconnect" % job) + self.onBuildCompleted(job, 'LOST') + + def onUnknownJob(self, job): + self.log.info("Gearman job %s lost due to unknown handle" % job) + self.onBuildCompleted(job, 'LOST') + + def cancelJobInQueue(self, build): + job = build.__gearman_job + + req = gear.CancelJobAdminRequest(job.handle) + job.connection.sendAdminRequest(req) + req.waitForResponse() + self.log.debug("Response to cancel build %s request: %s" % + (build, req.response.strip())) + if req.response.startswith("OK"): + try: + del self.builds[job.unique] + except: + pass + return True + return False + + def cancelRunningBuild(self, build): + stop_uuid = str(uuid4().hex) + stop_job = gear.Job("stop:%s" % build.__gearman_master, + build.uuid, + unique=stop_uuid) + self.meta_jobs[stop_uuid] = stop_job + self.log.debug("Submitting stop job: %s", stop_job) + self.gearman.submitJob(stop_job) + return True + + def setBuildDescription(self, build, desc): + try: + name = "set_description:%s" % build.__gearman_master + except AttributeError: + # We haven't yet received the first data packet that tells + # us where the job is running. + return False + + if not self.isJobRegistered(name): + return False + + desc_uuid = str(uuid4().hex) + data = dict(unique_id=build.uuid, + html_description=desc) + desc_job = gear.Job(name, json.dumps(data), unique=desc_uuid) + self.meta_jobs[desc_uuid] = desc_job + self.log.debug("Submitting describe job: %s", desc_job) + self.gearman.submitJob(desc_job) + return True + + def lookForLostBuilds(self): + self.log.debug("Looking for lost builds") + for build in self.builds.values(): + if build.result: + # The build has finished, it will be removed + continue + job = build.__gearman_job + if not job.handle: + # The build hasn't been enqueued yet + continue + p = gear.Packet(gear.constants.REQ, gear.constants.GET_STATUS, + job.handle) + job.connection.sendPacket(p) diff --git a/zuul/launcher/jenkins.py b/zuul/launcher/jenkins.py deleted file mode 100644 index 7ac92c3e42..0000000000 --- a/zuul/launcher/jenkins.py +++ /dev/null @@ -1,499 +0,0 @@ -# Copyright 2012 Hewlett-Packard Development Company, L.P. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# So we can name this module "jenkins" and still load the "jenkins" -# system module -from __future__ import absolute_import - -import json -import logging -import pprint -import threading -import time -import urllib # for extending jenkins lib -import urllib2 # for extending jenkins lib -from uuid import uuid4 - -import jenkins -from paste import httpserver -from webob import Request - -from zuul.model import Build - -# The amount of time we tolerate a change in build status without -# receiving a notification -JENKINS_GRACE_TIME = 60 - - -class JenkinsCallback(threading.Thread): - log = logging.getLogger("zuul.JenkinsCallback") - - def __init__(self, jenkins): - threading.Thread.__init__(self) - self.jenkins = jenkins - - def run(self): - httpserver.serve(self.app, host='0.0.0.0', port='8001') - - def app(self, environ, start_response): - request = Request(environ) - if request.path == '/jenkins_endpoint': - self.jenkins_endpoint(request) - start_response('200 OK', [('content-type', 'text/html')]) - return ['Zuul good.'] - elif request.path == '/status': - try: - ret = self.jenkins.sched.formatStatusHTML() - except: - self.log.exception("Exception formatting status:") - raise - start_response('200 OK', [('content-type', 'text/html')]) - return [ret] - elif request.path == '/status.json': - try: - ret = self.jenkins.sched.formatStatusJSON() - except: - self.log.exception("Exception formatting status:") - raise - start_response('200 OK', [('content-type', 'application/json'), - ('Access-Control-Allow-Origin', '*')]) - return [ret] - else: - start_response('200 OK', [('content-type', 'text/html')]) - return ['Zuul good.'] - - def jenkins_endpoint(self, request): - try: - data = json.loads(request.body) - except: - self.log.exception("Exception handling Jenkins notification:") - raise # let wsgi handler process the issue - if data: - self.log.debug("Received data from Jenkins: \n%s" % - (pprint.pformat(data))) - build = data.get('build') - if build: - phase = build.get('phase') - status = build.get('status') - url = build.get('full_url') - number = build.get('number') - params = build.get('parameters') - if params: - # UUID is deprecated in favor of ZUUL_UUID - uuid = params.get('ZUUL_UUID') or params.get('UUID') - if (status and url and uuid and phase and - phase == 'COMPLETED'): - self.jenkins.onBuildCompleted(uuid, - status, - url, - number) - if (phase and phase == 'STARTED'): - self.jenkins.onBuildStarted(uuid, url, number) - - -class JenkinsCleanup(threading.Thread): - """ A thread that checks to see if outstanding builds have - completed without reporting back. """ - log = logging.getLogger("zuul.JenkinsCleanup") - - def __init__(self, jenkins): - threading.Thread.__init__(self) - self.jenkins = jenkins - self.wake_event = threading.Event() - self._stopped = False - - def stop(self): - self._stopped = True - self.wake_event.set() - - def run(self): - while True: - self.wake_event.wait(180) - if self._stopped: - return - try: - self.jenkins.lookForLostBuilds() - except: - self.log.exception("Exception checking builds:") - - -STOP_BUILD = 'job/%(name)s/%(number)s/stop' -CANCEL_QUEUE = 'queue/item/%(number)s/cancelQueue' -BUILD_INFO = 'job/%(name)s/%(number)s/api/json?depth=0' -BUILD_DESCRIPTION = 'job/%(name)s/%(number)s/submitDescription' -DEBUG = False - - -class ExtendedJenkins(jenkins.Jenkins): - def jenkins_open(self, req): - ''' - Utility routine for opening an HTTP request to a Jenkins server. - ''' - try: - if self.auth: - req.add_header('Authorization', self.auth) - return urllib2.urlopen(req).read() - except urllib2.HTTPError, e: - if DEBUG: - print e.msg - print e.fp.read() - raise - - def stop_build(self, name, number): - ''' - Stop a running Jenkins build. - - @param name: Name of Jenkins job - @type name: str - @param number: Jenkins build number for the job - @type number: int - ''' - request = urllib2.Request(self.server + STOP_BUILD % locals()) - self.jenkins_open(request) - - def cancel_queue(self, number): - ''' - Cancel a queued build. - - @param number: Jenkins queue number for the build - @type number: int - ''' - # Jenkins returns a 302 from this URL, unless Referer is not set, - # then you get a 404. - request = urllib2.Request(self.server + CANCEL_QUEUE % locals(), - urllib.urlencode({}), - headers={'Referer': self.server}) - self.jenkins_open(request) - - def get_build_info(self, name, number): - ''' - Get information for a build. - - @param name: Name of Jenkins job - @type name: str - @param number: Jenkins build number for the job - @type number: int - @return: dictionary - ''' - request = urllib2.Request(self.server + BUILD_INFO % locals()) - return json.loads(self.jenkins_open(request)) - - def set_build_description(self, name, number, description): - ''' - Get information for a build. - - @param name: Name of Jenkins job - @type name: str - @param number: Jenkins build number for the job - @type number: int - @param description: Bulid description to set - @type description: str - ''' - params = urllib.urlencode({'description': description}) - request = urllib2.Request(self.server + BUILD_DESCRIPTION % locals(), - params) - self.jenkins_open(request) - - -class Jenkins(object): - log = logging.getLogger("zuul.Jenkins") - launch_retry_timeout = 5 - - def __init__(self, config, sched): - self.sched = sched - self.builds = {} - server = config.get('jenkins', 'server') - user = config.get('jenkins', 'user') - apikey = config.get('jenkins', 'apikey') - self.jenkins = ExtendedJenkins(server, user, apikey) - self.callback_thread = JenkinsCallback(self) - self.callback_thread.start() - self.cleanup_thread = JenkinsCleanup(self) - self.cleanup_thread.start() - - def stop(self): - self.cleanup_thread.stop() - self.cleanup_thread.join() - - #TODO: remove dependent_changes - def launch(self, job, change, pipeline, dependent_changes=[]): - self.log.info("Launch job %s for change %s with dependent changes %s" % - (job, change, dependent_changes)) - dependent_changes = dependent_changes[:] - dependent_changes.reverse() - uuid = str(uuid4().hex) - params = dict(UUID=uuid, # deprecated - ZUUL_UUID=uuid, - GERRIT_PROJECT=change.project.name, # deprecated - ZUUL_PROJECT=change.project.name) - params['ZUUL_PIPELINE'] = pipeline.name - if hasattr(change, 'refspec'): - changes_str = '^'.join( - ['%s:%s:%s' % (c.project.name, c.branch, c.refspec) - for c in dependent_changes + [change]]) - params['GERRIT_BRANCH'] = change.branch # deprecated - params['ZUUL_BRANCH'] = change.branch - params['GERRIT_CHANGES'] = changes_str # deprecated - params['ZUUL_CHANGES'] = changes_str - params['ZUUL_REF'] = ('refs/zuul/%s/%s' % - (change.branch, - change.current_build_set.ref)) - params['ZUUL_COMMIT'] = change.current_build_set.commit - - zuul_changes = ' '.join(['%s,%s' % (c.number, c.patchset) - for c in dependent_changes + [change]]) - params['ZUUL_CHANGE_IDS'] = zuul_changes - params['ZUUL_CHANGE'] = str(change.number) - params['ZUUL_PATCHSET'] = str(change.patchset) - if hasattr(change, 'ref'): - params['GERRIT_REFNAME'] = change.ref # deprecated - params['ZUUL_REFNAME'] = change.ref - params['GERRIT_OLDREV'] = change.oldrev # deprecated - params['ZUUL_OLDREV'] = change.oldrev - params['GERRIT_NEWREV'] = change.newrev # deprecated - params['ZUUL_NEWREV'] = change.newrev - params['ZUUL_SHORT_OLDREV'] = change.oldrev[:7] - params['ZUUL_SHORT_NEWREV'] = change.newrev[:7] - - params['ZUUL_REF'] = change.ref - params['ZUUL_COMMIT'] = change.newrev - - # This is what we should be heading toward for parameters: - - # required: - # ZUUL_UUID - # ZUUL_REF (/refs/zuul/..., /refs/tags/foo, master) - # ZUUL_COMMIT - - # optional: - # ZUUL_PROJECT - # ZUUL_PIPELINE - - # optional (changes only): - # ZUUL_BRANCH - # ZUUL_CHANGE - # ZUUL_CHANGE_IDS - # ZUUL_PATCHSET - - # optional (ref updated only): - # ZUUL_OLDREV - # ZUUL_NEWREV - # ZUUL_SHORT_NEWREV - # ZUUL_SHORT_OLDREV - - if callable(job.parameter_function): - job.parameter_function(change, params) - self.log.debug("Custom parameter function used for job %s, " - "change: %s, params: %s" % (job, change, params)) - - build = Build(job, uuid) - # We can get the started notification on another thread before - # this is done so we add the build even before we trigger the - # job on Jenkins. - self.builds[uuid] = build - # Sometimes Jenkins may erroneously respond with a 404. Handle - # that by retrying for 30 seconds. - launched = False - errored = False - for count in range(6): - try: - self.jenkins.build_job(job.name, parameters=params) - launched = True - break - except: - errored = True - self.log.exception("Exception launching build %s for " - "job %s for change %s (will retry):" % - (build, job, change)) - time.sleep(self.launch_retry_timeout) - - if errored: - if launched: - self.log.error("Finally able to launch %s" % build) - else: - self.log.error("Unable to launch %s, even after retrying, " - "declaring lost" % build) - # To keep the queue moving, declare this as a lost build - # so that the change will get dropped. - self.onBuildCompleted(build.uuid, 'LOST', None, None) - return build - - def findBuildInQueue(self, build): - for item in self.jenkins.get_queue_info(): - if 'actions' not in item: - continue - for action in item['actions']: - if 'parameters' not in action: - continue - parameters = action['parameters'] - for param in parameters: - # UUID is deprecated in favor of ZUUL_UUID - if ((param['name'] in ['ZUUL_UUID', 'UUID']) - and build.uuid == param['value']): - return item - return False - - def cancel(self, build): - self.log.info("Cancel build %s for job %s" % (build, build.job)) - if build.number: - self.log.debug("Build %s has already started" % build) - self.jenkins.stop_build(build.job.name, build.number) - self.log.debug("Canceled running build %s" % build) - return - else: - self.log.debug("Build %s has not started yet" % build) - - self.log.debug("Looking for build %s in queue" % build) - item = self.findBuildInQueue(build) - if item: - self.log.debug("Found queue item %s for build %s" % - (item['id'], build)) - try: - self.jenkins.cancel_queue(item['id']) - self.log.debug("Canceled queue item %s for build %s" % - (item['id'], build)) - return - except: - self.log.exception("Exception canceling queue item %s " - "for build %s" % (item['id'], build)) - - self.log.debug("Still unable to find build %s to cancel" % build) - if build.number: - self.log.debug("Build %s has just started" % build) - self.jenkins.stop_build(build.job.name, build.number) - self.log.debug("Canceled just running build %s" % build) - else: - self.log.error("Build %s has not started but " - "was not found in queue" % build) - - def setBuildDescription(self, build, description): - if not build.number: - return - try: - self.jenkins.set_build_description(build.job.name, - build.number, - description) - except: - self.log.exception("Exception setting build description for %s" % - build) - - def onBuildCompleted(self, uuid, status, url, number): - self.log.info("Build %s #%s complete, status %s" % - (uuid, number, status)) - build = self.builds.get(uuid) - if build: - self.log.debug("Found build %s" % build) - del self.builds[uuid] - if url: - build.url = url - build.result = status - build.number = number - self.sched.onBuildCompleted(build) - else: - self.log.error("Unable to find build %s" % uuid) - - def onBuildStarted(self, uuid, url, number): - self.log.info("Build %s #%s started, url: %s" % (uuid, number, url)) - build = self.builds.get(uuid) - if build: - self.log.debug("Found build %s" % build) - build.url = url - build.number = number - self.sched.onBuildStarted(build) - else: - self.log.error("Unable to find build %s" % uuid) - - def lookForLostBuilds(self): - self.log.debug("Looking for lost builds") - lostbuilds = [] - for build in self.builds.values(): - if build.result: - # The build has finished, it will be removed - continue - if build.number: - # The build has started; see if it has finished - try: - info = self.jenkins.get_build_info(build.job.name, - build.number) - if hasattr(build, '_jenkins_missing_build_info'): - del build._jenkins_missing_build_info - except: - self.log.exception("Exception getting info for %s" % build) - # We can't look it up in jenkins. That could be transient. - # If it keeps up, assume it's permanent. - if hasattr(build, '_jenkins_missing_build_info'): - missing_time = build._jenkins_missing_build_info - if time.time() - missing_time > JENKINS_GRACE_TIME: - self.log.debug("Lost build %s because " - "it has started but " - "the build URL is not working" % - build) - lostbuilds.append(build) - else: - build._jenkins_missing_build_info = time.time() - continue - - if not info: - self.log.debug("Lost build %s because " - "it started but " - "info can not be retreived" % build) - lostbuilds.append(build) - continue - if info['building']: - # It has not finished. - continue - if info['duration'] == 0: - # Possible jenkins bug -- not building, but no duration - self.log.debug("Possible jenkins bug with build %s: " - "not building, but no duration is set " - "Build info %s:" % (build, - pprint.pformat(info))) - continue - finish_time = (info['timestamp'] + info['duration']) / 1000 - if time.time() - finish_time > JENKINS_GRACE_TIME: - self.log.debug("Lost build %s because " - "it finished more than 5 minutes ago. " - "Build info %s:" % (build, - pprint.pformat(info))) - lostbuilds.append(build) - continue - # Give it more time - else: - # The build has not started - if time.time() - build.launch_time < JENKINS_GRACE_TIME: - # It just started, give it a bit - continue - info = self.findBuildInQueue(build) - if info: - # It's in the queue. All good. - continue - if build.number: - # We just got notified it started - continue - # It may have just started. If we keep ending up here, - # assume the worst. - if hasattr(build, '_jenkins_missing_from_queue'): - missing_time = build._jenkins_missing_from_queue - if time.time() - missing_time > JENKINS_GRACE_TIME: - self.log.debug("Lost build %s because " - "it has not started and " - "is not in the queue" % build) - lostbuilds.append(build) - continue - else: - build._jenkins_missing_from_queue = time.time() - - for build in lostbuilds: - self.log.error("Declaring %s lost" % build) - self.onBuildCompleted(build.uuid, 'LOST', None, None) diff --git a/zuul/model.py b/zuul/model.py index eb8bc26511..a2ab39e60e 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -457,6 +457,7 @@ class Build(object): self.launch_time = time.time() self.start_time = None self.end_time = None + self.fraction_complete = None def __repr__(self): return '' % (self.uuid, self.job.name) diff --git a/zuul/webapp.py b/zuul/webapp.py new file mode 100644 index 0000000000..1e969ea12a --- /dev/null +++ b/zuul/webapp.py @@ -0,0 +1,50 @@ +# Copyright 2012 Hewlett-Packard Development Company, L.P. +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import threading +from paste import httpserver +from webob import Request + + +class WebApp(threading.Thread): + log = logging.getLogger("zuul.WebApp") + + def __init__(self, scheduler): + threading.Thread.__init__(self) + self.scheduler = scheduler + + def run(self): + self.server = httpserver.serve(self.app, host='0.0.0.0', port='8001', + start_loop=False) + self.server.serve_forever() + + def stop(self): + self.server.server_close() + + def app(self, environ, start_response): + request = Request(environ) + if request.path == '/status.json': + try: + ret = self.scheduler.formatStatusJSON() + except: + self.log.exception("Exception formatting status:") + raise + start_response('200 OK', [('content-type', 'application/json'), + ('Access-Control-Allow-Origin', '*')]) + return [ret] + else: + start_response('404 Not Found', [('content-type', 'text/plain')]) + return ['Not found.']