Create ansible job launch server

Create a new server that acts as combined merger and job launcher.
Remove the merge step from the scheduler.
Update the gearman job launcher to target the new server.

Change-Id: I14e3d96cadec6e4b4cca66137071e8ed67f161a1
This commit is contained in:
James E. Blair 2015-12-23 15:26:17 -08:00
parent 2a629ec970
commit f5dbd00712
6 changed files with 329 additions and 104 deletions

View File

@ -48,6 +48,7 @@ import zuul.connection.smtp
import zuul.scheduler
import zuul.webapp
import zuul.rpclistener
import zuul.launcher.ansible
import zuul.launcher.gearman
import zuul.lib.swift
import zuul.lib.connections
@ -635,6 +636,23 @@ class FakeBuild(threading.Thread):
self.worker.lock.release()
class RecordingLaunchServer(zuul.launcher.ansible.LaunchServer):
def __init__(self, *args, **kw):
super(RecordingLaunchServer, self).__init__(*args, **kw)
self.job_history = []
def launch(self, job):
self.job_history.append(job)
job.data = []
def sendWorkComplete(data=b''):
job.data.append(data)
gear.WorkerJob.sendWorkComplete(job, data)
job.sendWorkComplete = sendWorkComplete
super(RecordingLaunchServer, self).launch(job)
class FakeWorker(gear.Worker):
def __init__(self, worker_id, test):
super(FakeWorker, self).__init__(worker_id)
@ -925,10 +943,6 @@ class ZuulTestCase(BaseTestCase):
self.config.set('gearman', 'port', str(self.gearman_server.port))
self.worker = FakeWorker('fake_worker', self)
self.worker.addServer('127.0.0.1', self.gearman_server.port)
self.gearman_server.worker = self.worker
zuul.source.gerrit.GerritSource.replication_timeout = 1.5
zuul.source.gerrit.GerritSource.replication_retry_interval = 0.5
zuul.connection.gerrit.GerritEventConnector.delay = 0.0
@ -947,6 +961,10 @@ class ZuulTestCase(BaseTestCase):
self.configure_connections(self.sched)
self.sched.registerConnections(self.connections)
self.ansible_server = RecordingLaunchServer(
self.config, self.connections)
self.ansible_server.start()
def URLOpenerFactory(*args, **kw):
if isinstance(args[0], urllib2.Request):
return old_urlopen(*args, **kw)
@ -976,9 +994,6 @@ class ZuulTestCase(BaseTestCase):
self.webapp.start()
self.rpc.start()
self.launcher.gearman.waitForServer()
self.registerJobs()
self.builds = self.worker.running_builds
self.history = self.worker.build_history
self.addCleanup(self.assertFinalState)
self.addCleanup(self.shutdown)
@ -1086,7 +1101,6 @@ class ZuulTestCase(BaseTestCase):
self.merge_server.stop()
self.merge_server.join()
self.merge_client.stop()
self.worker.shutdown()
self.sched.stop()
self.sched.join()
self.statsd.stop()
@ -1183,18 +1197,6 @@ class ZuulTestCase(BaseTestCase):
self.log.debug(" OK")
return True
def registerJobs(self):
count = 0
for tenant in self.sched.abide.tenants.values():
for job in tenant.layout.jobs.keys():
self.worker.registerFunction('build:' + job)
count += 1
self.worker.registerFunction('stop:' + self.worker.worker_id)
count += 1
while len(self.gearman_server.functions) < count:
time.sleep(0)
def orderedRelease(self):
# Run one build at a time to ensure non-race order:
while len(self.builds):
@ -1239,15 +1241,15 @@ class ZuulTestCase(BaseTestCase):
# Find out if every build that the worker has completed has been
# reported back to Zuul. If it hasn't then that means a Gearman
# event is still in transit and the system is not stable.
for build in self.worker.build_history:
zbuild = self.launcher.builds.get(build.uuid)
for job in self.ansible_server.job_history:
zbuild = self.launcher.builds.get(job.unique)
if not zbuild:
# It has already been reported
continue
# It hasn't been reported yet.
return False
# Make sure that none of the worker connections are in GRAB_WAIT
for connection in self.worker.active_connections:
for connection in self.ansible_server.worker.active_connections:
if connection.state == 'GRAB_WAIT':
return False
return True
@ -1311,7 +1313,6 @@ class ZuulTestCase(BaseTestCase):
print self.areAllBuildsWaiting()
raise Exception("Timeout waiting for Zuul to settle")
# Make sure no new events show up while we're checking
self.worker.lock.acquire()
# have all build states propogated to zuul?
if self.haveAllBuildsReported():
# Join ensures that the queue is empty _and_ events have been
@ -1323,11 +1324,9 @@ class ZuulTestCase(BaseTestCase):
self.haveAllBuildsReported() and
self.areAllBuildsWaiting()):
self.sched.run_handler_lock.release()
self.worker.lock.release()
self.log.debug("...settled.")
return
self.sched.run_handler_lock.release()
self.worker.lock.release()
self.sched.wake_event.wait(0.1)
def countJobResults(self, jobs, result):
@ -1335,10 +1334,16 @@ class ZuulTestCase(BaseTestCase):
return len(jobs)
def getJobFromHistory(self, name):
history = self.worker.build_history
history = self.ansible_server.job_history
for job in history:
if job.name == name:
return job
params = json.loads(job.arguments)
if params['job'] == name:
result = json.loads(job.data[-1])
print result
ret = BuildHistory(job=job,
name=params['job'],
result=result['result'])
return ret
raise Exception("Unable to find job %s in history" % name)
def assertEmptyQueues(self):

244
zuul/launcher/ansible.py Normal file
View File

@ -0,0 +1,244 @@
# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import json
import logging
import shutil
import tempfile
import threading
import traceback
import gear
import zuul.merger
class TempDir(object):
def __init__(self):
self.tmpdir = tempfile.mkdtemp()
def __enter__(self):
return self.tmpdir
def __exit__(self, etype, value, tb):
shutil.rmtree(self.tmpdir)
class UpdateTask(object):
def __init__(self, project, url):
self.project = project
self.url = url
self.event = threading.Event()
def __eq__(self, other):
if other.project == self.project:
return True
return False
def wait(self):
self.event.wait()
def setComplete(self):
self.event.set()
class DeduplicateQueue(object):
def __init__(self):
self.queue = collections.deque()
self.condition = threading.Condition()
def qsize(self):
return len(self.queue)
def put(self, item):
# Returns the original item if added, or an equivalent item if
# already enqueued.
self.condition.acquire()
ret = None
try:
for x in self.queue:
if item == x:
ret = x
if ret is None:
ret = item
self.queue.append(item)
self.condition.notify()
finally:
self.condition.release()
return ret
def get(self):
self.condition.acquire()
try:
while True:
try:
ret = self.queue.popleft()
return ret
except IndexError:
pass
self.condition.wait()
finally:
self.condition.release()
class LaunchServer(object):
log = logging.getLogger("zuul.LaunchServer")
def __init__(self, config, connections={}):
self.config = config
self.zuul_url = config.get('merger', 'zuul_url')
if self.config.has_option('merger', 'git_dir'):
self.merge_root = self.config.get('merger', 'git_dir')
else:
self.merge_root = '/var/lib/zuul/git'
if self.config.has_option('merger', 'git_user_email'):
self.merge_email = self.config.get('merger', 'git_user_email')
else:
self.merge_email = None
if self.config.has_option('merger', 'git_user_name'):
self.merge_name = self.config.get('merger', 'git_user_name')
else:
self.merge_name = None
self.connections = connections
self.merger = self._getMerger(self.merge_root)
self.update_queue = DeduplicateQueue()
def _getMerger(self, root):
return zuul.merger.merger.Merger(root, self.connections,
self.merge_email, self.merge_name)
def start(self):
self._running = True
server = self.config.get('gearman', 'server')
if self.config.has_option('gearman', 'port'):
port = self.config.get('gearman', 'port')
else:
port = 4730
self.worker = gear.Worker('Zuul Launch Server')
self.worker.addServer(server, port)
self.log.debug("Waiting for server")
self.worker.waitForServer()
self.log.debug("Registering")
self.register()
self.log.debug("Starting worker")
self.update_thread = threading.Thread(target=self._updateLoop)
self.update_thread.daemon = True
self.update_thread.start()
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.thread.start()
def register(self):
self.worker.registerFunction("launcher:launch")
# TODOv3: abort
self.worker.registerFunction("merger:cat")
def stop(self):
self.log.debug("Stopping")
self._running = False
self.worker.shutdown()
self.log.debug("Stopped")
def join(self):
self.update_thread.join()
self.thread.join()
def _updateLoop(self):
while self._running:
try:
self._innerUpdateLoop()
except:
self.log.exception("Exception in update thread:")
def _innerUpdateLoop(self):
# Inside of a loop that keeps the main repository up to date
task = self.update_queue.get()
self.log.info("Updating repo %s from %s" % (task.project, task.url))
self.merger.updateRepo(task.project, task.url)
self.log.debug("Finished updating repo %s from %s" %
(task.project, task.url))
task.setComplete()
def update(self, project, url):
task = UpdateTask(project, url)
task = self.update_queue.put(task)
return task
def run(self):
self.log.debug("Starting launch listener")
while self._running:
try:
job = self.worker.getJob()
try:
if job.name == 'launcher:launch':
self.log.debug("Got launch job: %s" % job.unique)
self.launch(job)
elif job.name == 'merger:cat':
self.log.debug("Got cat job: %s" % job.unique)
self.cat(job)
else:
self.log.error("Unable to handle job %s" % job.name)
job.sendWorkFail()
except Exception:
self.log.exception("Exception while running job")
job.sendWorkException(traceback.format_exc())
except Exception:
self.log.exception("Exception while getting job")
def launch(self, job):
thread = threading.Thread(target=self._launch, args=(job,))
thread.start()
def _launch(self, job):
self.log.debug("Job %s: beginning" % (job.unique,))
with TempDir() as root:
self.log.debug("Job %s: git root at %s" % (job.unique, root))
args = json.loads(job.arguments)
tasks = []
for project in args['projects']:
self.log.debug("Job %s: updating project %s" %
(job.unique, project['name']))
tasks.append(self.update(project['name'], project['url']))
for task in tasks:
task.wait()
self.log.debug("Job %s: git updates complete" % (job.unique,))
merger = self._getMerger(root)
commit = merger.mergeChanges(args['items']) # noqa
# TODOv3: Ansible the ansible thing here.
data = {
'url': 'https://server/job',
'number': 1
}
job.sendWorkData(json.dumps(data))
job.sendWorkStatus(0, 100)
result = dict(result='SUCCESS')
job.sendWorkComplete(json.dumps(result))
def cat(self, job):
args = json.loads(job.arguments)
task = self.update(args['project'], args['url'])
task.wait()
files = self.merger.getFiles(args['project'], args['url'],
args['branch'], args['files'])
result = dict(updated=True,
files=files,
zuul_url=self.zuul_url)
job.sendWorkComplete(json.dumps(result))

View File

@ -25,6 +25,35 @@ import zuul.model
from zuul.model import Build
def make_merger_item(item):
# Create a dictionary with all info about the item needed by
# the merger.
number = None
patchset = None
oldrev = None
newrev = None
if hasattr(item.change, 'number'):
number = item.change.number
patchset = item.change.patchset
elif hasattr(item.change, 'newrev'):
oldrev = item.change.oldrev
newrev = item.change.newrev
connection_name = item.pipeline.source.connection.connection_name
return dict(project=item.change.project.name,
url=item.pipeline.source.getGitUrl(
item.change.project),
connection_name=connection_name,
merge_mode=item.change.project.merge_mode,
refspec=item.change.refspec,
branch=item.change.branch,
ref=item.current_build_set.ref,
number=number,
patchset=patchset,
oldrev=oldrev,
newrev=newrev,
)
class GearmanCleanup(threading.Thread):
""" A thread that checks to see if outstanding builds have
completed without reporting back. """
@ -304,7 +333,7 @@ class Gearman(object):
params['ZUUL_REF'] = item.change.ref
params['ZUUL_COMMIT'] = item.change.newrev
# The destination_path is a unqiue path for this build request
# The destination_path is a unique path for this build request
# and generally where the logs are expected to be placed
destination_path = os.path.join(item.change.getBasePath(),
pipeline.name, job.name, uuid[:7])
@ -335,10 +364,21 @@ class Gearman(object):
# ZUUL_OLDREV
# ZUUL_NEWREV
if 'ZUUL_NODE' in params:
name = "build:%s:%s" % (job.name, params['ZUUL_NODE'])
else:
name = "build:%s" % job.name
all_items = dependent_items + [item]
merger_items = map(make_merger_item, all_items)
params['job'] = job.name
params['items'] = merger_items
params['projects'] = []
projects = set()
for item in all_items:
if item.change.project not in projects:
params['projects'].append(
dict(name=item.change.project.name,
url=item.pipeline.source.getGitUrl(
item.change.project)))
projects.add(item.change.project)
build = Build(job, uuid)
build.parameters = params
@ -346,7 +386,7 @@ class Gearman(object):
self.sched.onBuildCompleted(build, 'SUCCESS')
return build
gearman_job = gear.Job(name, json.dumps(params),
gearman_job = gear.Job('launcher:launch', json.dumps(params),
unique=uuid)
build.__gearman_job = gearman_job
self.builds[uuid] = build

View File

@ -329,61 +329,6 @@ class BasePipelineManager(object):
self.dequeueItem(item)
self.reportStats(item)
def _makeMergerItem(self, item):
# Create a dictionary with all info about the item needed by
# the merger.
number = None
patchset = None
oldrev = None
newrev = None
if hasattr(item.change, 'number'):
number = item.change.number
patchset = item.change.patchset
elif hasattr(item.change, 'newrev'):
oldrev = item.change.oldrev
newrev = item.change.newrev
connection_name = self.pipeline.source.connection.connection_name
return dict(project=item.change.project.name,
url=self.pipeline.source.getGitUrl(
item.change.project),
connection_name=connection_name,
merge_mode=item.change.project.merge_mode,
refspec=item.change.refspec,
branch=item.change.branch,
ref=item.current_build_set.ref,
number=number,
patchset=patchset,
oldrev=oldrev,
newrev=newrev,
)
def prepareRef(self, item):
# Returns True if the ref is ready, false otherwise
build_set = item.current_build_set
if build_set.merge_state == build_set.COMPLETE:
return True
if build_set.merge_state == build_set.PENDING:
return False
build_set.merge_state = build_set.PENDING
ref = build_set.ref
if hasattr(item.change, 'refspec') and not ref:
self.log.debug("Preparing ref for: %s" % item.change)
item.current_build_set.setConfiguration()
dependent_items = self.getDependentItems(item)
dependent_items.reverse()
all_items = dependent_items + [item]
merger_items = map(self._makeMergerItem, all_items)
self.sched.merger.mergeChanges(merger_items,
item.current_build_set,
self.pipeline.precedence)
else:
self.log.debug("Preparing update repo for: %s" % item.change)
url = self.pipeline.source.getGitUrl(item.change.project)
self.sched.merger.updateRepo(item.change.project.name,
url, build_set,
self.pipeline.precedence)
return False
def _launchJobs(self, item, jobs):
self.log.debug("Launching jobs for change %s" % item.change)
dependent_items = self.getDependentItems(item)
@ -453,7 +398,6 @@ class BasePipelineManager(object):
dep_items = self.getFailingDependentItems(item)
actionable = change_queue.isActionable(item)
item.active = actionable
ready = False
if dep_items:
failing_reasons.append('a needed change is failing')
self.cancelJobs(item, prime=False)
@ -471,12 +415,10 @@ class BasePipelineManager(object):
change_queue.moveItem(item, nnfi)
changed = True
self.cancelJobs(item)
if actionable:
ready = self.prepareRef(item)
if item.current_build_set.unable_to_merge:
failing_reasons.append("it has a merge conflict")
ready = False
if actionable and ready and self.launchJobs(item):
if actionable:
if not item.current_build_set.ref:
item.current_build_set.setConfiguration()
if actionable and self.launchJobs(item):
changed = True
if self.pipeline.didAnyJobFail(item):
failing_reasons.append("at least one job failed")
@ -610,9 +552,6 @@ class BasePipelineManager(object):
actions = self.pipeline.success_actions
item.setReportedResult('SUCCESS')
self.pipeline._consecutive_failures = 0
elif not self.pipeline.didMergerSucceed(item):
actions = self.pipeline.merge_failure_actions
item.setReportedResult('MERGER_FAILURE')
else:
actions = self.pipeline.failure_actions
item.setReportedResult('FAILURE')

View File

@ -235,8 +235,7 @@ class Pipeline(object):
item.removeBuild(build)
elif build.result != 'SUCCESS':
# Get a JobTree from a Job so we can find only its dependent jobs
root = self.getJobTree(item.change.project)
tree = root.getJobTreeForJob(build.job)
tree = item.job_tree.getJobTreeForJob(build.job)
for job in tree.getJobs():
fakebuild = Build(job, None)
fakebuild.result = 'SKIPPED'

View File

@ -507,8 +507,6 @@ class Scheduler(threading.Thread):
def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete")
waiting = False
if self.merger.areMergesOutstanding():
waiting = True
for pipeline in self.layout.pipelines.values():
for item in pipeline.getAllItems():
for build in item.current_build_set.getBuilds():