Annotate builds with event id

It's useful to be able to trace an event through the system including
the builds.

Change-Id: If852cbe8aecc4cf346dccc1b8fc34272c8ff483d
This commit is contained in:
Tobias Henkel
2019-05-13 20:50:56 +02:00
parent 1f8ec8499f
commit 6f3bcdd6b6
8 changed files with 93 additions and 66 deletions

View File

@@ -6586,7 +6586,7 @@ class TestSemaphore(ZuulTestCase):
# Simulate a single zk error in useNodeSet
orig_useNodeSet = self.nodepool.useNodeSet
def broken_use_nodeset(nodeset, build_set=None):
def broken_use_nodeset(nodeset, build_set=None, event=None):
# restore original useNodeSet
self.nodepool.useNodeSet = orig_useNodeSet
raise NoNodeError()

View File

@@ -24,6 +24,7 @@ import zuul.model
from zuul.lib.config import get_default
from zuul.lib.gear_utils import getGearmanFunctions
from zuul.lib.jsonutil import json_dumps
from zuul.lib.logutil import get_annotated_logger
from zuul.model import Build
@@ -138,13 +139,14 @@ class ExecutorClient(object):
def execute(self, job, item, pipeline, dependent_changes=[],
merger_items=[]):
log = get_annotated_logger(self.log, item.event)
tenant = pipeline.tenant
uuid = str(uuid4().hex)
nodeset = item.current_build_set.getJobNodeSet(job.name)
self.log.info(
log.info(
"Execute job %s (uuid: %s) on nodes %s for change %s "
"with dependent changes %s" % (
job, uuid, nodeset, item.change, dependent_changes))
"with dependent changes %s",
job, uuid, nodeset, item.change, dependent_changes)
project = dict(
name=item.change.project.name,
@@ -300,13 +302,13 @@ class ExecutorClient(object):
src_dir=os.path.join('src', p.canonical_name),
required=(p in required_projects),
))
build = Build(job, uuid)
params['zuul_event_id'] = item.event.zuul_event_id
build = Build(job, uuid, zuul_event_id=item.event.zuul_event_id)
build.parameters = params
build.nodeset = nodeset
self.log.debug("Adding build %s of job %s to item %s" %
(build, job, item))
log.debug("Adding build %s of job %s to item %s",
build, job, item)
item.addBuild(build)
if job.name == 'noop':
@@ -353,18 +355,17 @@ class ExecutorClient(object):
self.gearman.submitJob(gearman_job, precedence=precedence,
timeout=300)
except Exception:
self.log.exception("Unable to submit job to Gearman")
log.exception("Unable to submit job to Gearman")
self.onBuildCompleted(gearman_job, 'EXCEPTION')
return build
if not gearman_job.handle:
self.log.error("No job handle was received for %s after"
" 300 seconds; marking as lost." %
gearman_job)
log.error("No job handle was received for %s after"
" 300 seconds; marking as lost.",
gearman_job)
self.onBuildCompleted(gearman_job, 'NO_HANDLE')
self.log.debug("Received handle %s for %s" % (gearman_job.handle,
build))
log.debug("Received handle %s for %s", gearman_job.handle, build)
return build
@@ -410,6 +411,9 @@ class ExecutorClient(object):
build = self.builds.get(job.unique)
if build:
log = get_annotated_logger(self.log, build.zuul_event_id,
build=job.unique)
data = getJobData(job)
build.node_labels = data.get('node_labels', [])
build.node_name = data.get('node_name')
@@ -442,8 +446,8 @@ class ExecutorClient(object):
result_data = data.get('data', {})
warnings = data.get('warnings', [])
self.log.info("Build %s complete, result %s, warnings %s" %
(job, result, warnings))
log.info("Build complete, result %s, warnings %s",
result, warnings)
# If the build should be retried, don't supply the result
# so that elsewhere we don't have to deal with keeping
# track of which results are non-final.

View File

@@ -216,11 +216,13 @@ class Watchdog(object):
class SshAgent(object):
log = logging.getLogger("zuul.ExecutorServer")
def __init__(self):
def __init__(self, zuul_event_id=None, build=None):
self.env = {}
self.ssh_agent = None
self.log = get_annotated_logger(
logging.getLogger("zuul.ExecutorServer"),
zuul_event_id, build=build)
def start(self):
if self.ssh_agent:
@@ -512,7 +514,8 @@ class JobDir(object):
class UpdateTask(object):
def __init__(self, connection_name, project_name):
def __init__(self, connection_name, project_name, zuul_event_id=None,
build=None):
self.connection_name = connection_name
self.project_name = project_name
self.canonical_name = None
@@ -521,6 +524,10 @@ class UpdateTask(object):
self.event = threading.Event()
self.success = False
# These variables are used for log annotation
self.zuul_event_id = zuul_event_id
self.build = build
def __eq__(self, other):
if (other and other.connection_name == self.connection_name and
other.project_name == self.project_name):
@@ -663,11 +670,12 @@ class AnsibleJob(object):
def __init__(self, executor_server, job):
logger = logging.getLogger("zuul.AnsibleJob")
# TODO(tobiash): Add zuul event id when it's plumbed through
self.log = get_annotated_logger(logger, None, build=job.unique)
self.arguments = json.loads(job.arguments)
self.zuul_event_id = self.arguments.get('zuul_event_id')
self.log = get_annotated_logger(
logger, self.zuul_event_id, build=job.unique)
self.executor_server = executor_server
self.job = job
self.arguments = json.loads(job.arguments)
self.jobdir = None
self.proc = None
self.proc_lock = threading.Lock()
@@ -697,7 +705,8 @@ class AnsibleJob(object):
self.executor_server.config,
'executor',
'winrm_read_timeout_sec')
self.ssh_agent = SshAgent()
self.ssh_agent = SshAgent(zuul_event_id=self.zuul_event_id,
build=self.job.unique)
self.executor_variables_file = None
@@ -823,7 +832,9 @@ class AnsibleJob(object):
for project in args['projects']:
self.log.debug("Updating project %s" % (project,))
tasks.append(self.executor_server.update(
project['connection'], project['name']))
project['connection'], project['name'],
zuul_event_id=self.zuul_event_id,
build=self.job.unique))
projects.add((project['connection'], project['name']))
# ...as well as all playbook and role projects.
@@ -838,7 +849,9 @@ class AnsibleJob(object):
self.log.debug("Updating playbook or role %s" % (repo['project'],))
key = (repo['connection'], repo['project'])
if key not in projects:
tasks.append(self.executor_server.update(*key))
tasks.append(self.executor_server.update(
*key, zuul_event_id=self.zuul_event_id,
build=self.job.unique))
projects.add(key)
for task in tasks:
@@ -2516,13 +2529,17 @@ class ExecutorServer(object):
if task is None:
# We are asked to stop
raise StopException()
log = get_annotated_logger(
self.log, task.zuul_event_id, build=task.build)
try:
lock = self.repo_locks.getRepoLock(
task.connection_name, task.project_name)
with lock:
self.log.info("Updating repo %s/%s",
task.connection_name, task.project_name)
self.merger.updateRepo(task.connection_name, task.project_name)
log.info("Updating repo %s/%s",
task.connection_name, task.project_name)
self.merger.updateRepo(
task.connection_name, task.project_name,
zuul_event_id=task.zuul_event_id, build=task.build)
repo = self.merger.getRepo(
task.connection_name, task.project_name)
source = self.connections.getSource(task.connection_name)
@@ -2530,18 +2547,20 @@ class ExecutorServer(object):
task.canonical_name = project.canonical_name
task.branches = repo.getBranches()
task.refs = [r.name for r in repo.getRefs()]
self.log.debug("Finished updating repo %s/%s",
task.connection_name, task.project_name)
log.debug("Finished updating repo %s/%s",
task.connection_name, task.project_name)
task.success = True
except Exception:
self.log.exception('Got exception while updating repo %s/%s',
task.connection_name, task.project_name)
log.exception('Got exception while updating repo %s/%s',
task.connection_name, task.project_name)
finally:
task.setComplete()
def update(self, connection_name, project_name):
def update(self, connection_name, project_name, zuul_event_id=None,
build=None):
# Update a repository in the main merger
task = UpdateTask(connection_name, project_name)
task = UpdateTask(connection_name, project_name,
zuul_event_id=zuul_event_id, build=build)
task = self.update_queue.put(task)
return task

View File

@@ -397,21 +397,23 @@ class PipelineManager(object):
return True
def _executeJobs(self, item, jobs):
self.log.debug("Executing jobs for change %s" % item.change)
log = get_annotated_logger(self.log, item.event)
log.debug("Executing jobs for change %s", item.change)
build_set = item.current_build_set
for job in jobs:
self.log.debug("Found job %s for change %s" % (job, item.change))
log.debug("Found job %s for change %s", job, item.change)
try:
nodeset = item.current_build_set.getJobNodeSet(job.name)
self.sched.nodepool.useNodeSet(
nodeset, build_set=item.current_build_set)
nodeset, build_set=item.current_build_set,
event=item.event)
self.sched.executor.execute(
job, item, self.pipeline,
build_set.dependent_changes,
build_set.merger_items)
except Exception:
self.log.exception("Exception while executing job %s "
"for change %s:" % (job, item.change))
log.exception("Exception while executing job %s "
"for change %s:", job, item.change)
try:
# If we hit an exception we don't have a build in the
# current item so a potentially aquired semaphore must be
@@ -419,7 +421,7 @@ class PipelineManager(object):
tenant = item.pipeline.tenant
tenant.semaphore_handler.release(item, job)
except Exception:
self.log.exception("Exception while releasing semaphore")
log.exception("Exception while releasing semaphore")
def executeJobs(self, item):
# TODO(jeblair): This should return a value indicating a job

View File

@@ -148,8 +148,8 @@ class Repo(object):
# connection and DoS Gerrit.
client.close()
def _ensure_cloned(self, zuul_event_id):
log = get_annotated_logger(self.log, zuul_event_id)
def _ensure_cloned(self, zuul_event_id, build=None):
log = get_annotated_logger(self.log, zuul_event_id, build=build)
repo_is_cloned = os.path.exists(os.path.join(self.local_path, '.git'))
if self._initialized and repo_is_cloned:
try:
@@ -173,7 +173,7 @@ class Repo(object):
log.debug("Cloning from %s to %s",
redact_url(clone_url), self.local_path)
self._git_clone(clone_url, zuul_event_id)
self._git_clone(clone_url, zuul_event_id, build=build)
repo = git.Repo(self.local_path)
repo.git.update_environment(**self.env)
@@ -197,8 +197,8 @@ class Repo(object):
def isInitialized(self):
return self._initialized
def _git_clone(self, url, zuul_event_id):
log = get_annotated_logger(self.log, zuul_event_id)
def _git_clone(self, url, zuul_event_id, build=None):
log = get_annotated_logger(self.log, zuul_event_id, build=build)
mygit = git.cmd.Git(os.getcwd())
mygit.update_environment(**self.env)
@@ -260,17 +260,17 @@ class Repo(object):
with repo.remotes.origin.config_writer as config_writer:
config_writer.set('url', url)
def createRepoObject(self, zuul_event_id):
self._ensure_cloned(zuul_event_id)
def createRepoObject(self, zuul_event_id, build=None):
self._ensure_cloned(zuul_event_id, build=build)
repo = git.Repo(self.local_path)
repo.git.update_environment(**self.env)
return repo
def reset(self, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
def reset(self, zuul_event_id=None, build=None):
log = get_annotated_logger(self.log, zuul_event_id, build=build)
log.debug("Resetting repository %s", self.local_path)
self.update(zuul_event_id=zuul_event_id)
repo = self.createRepoObject(zuul_event_id)
self.update(zuul_event_id=zuul_event_id, build=build)
repo = self.createRepoObject(zuul_event_id, build=build)
origin = repo.remotes.origin
seen = set()
head = None
@@ -445,9 +445,9 @@ class Repo(object):
log.debug("Pushing %s:%s to %s", local, remote, self.remote_url)
repo.remotes.origin.push('%s:%s' % (local, remote))
def update(self, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
repo = self.createRepoObject(zuul_event_id)
def update(self, zuul_event_id=None, build=None):
log = get_annotated_logger(self.log, zuul_event_id, build=build)
repo = self.createRepoObject(zuul_event_id, build=build)
log.debug("Updating repository %s" % self.local_path)
if repo.git.version_info[:2] < (1, 9):
# Before 1.9, 'git fetch --tags' did not include the
@@ -595,14 +595,15 @@ class Merger(object):
return self._addProject(hostname, project_name, url, sshkey,
zuul_event_id)
def updateRepo(self, connection_name, project_name, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
def updateRepo(self, connection_name, project_name, zuul_event_id=None,
build=None):
log = get_annotated_logger(self.log, zuul_event_id, build=build)
repo = self.getRepo(connection_name, project_name,
zuul_event_id=zuul_event_id)
try:
log.info("Updating local repository %s/%s",
connection_name, project_name)
repo.reset()
repo.reset(zuul_event_id=zuul_event_id, build=build)
except Exception:
log.exception("Unable to update %s/%s",
connection_name, project_name)

View File

@@ -1813,7 +1813,7 @@ class Build(object):
Job (related builds are grouped together in a BuildSet).
"""
def __init__(self, job, uuid):
def __init__(self, job, uuid, zuul_event_id=None):
self.job = job
self.uuid = uuid
self.url = None
@@ -1833,6 +1833,7 @@ class Build(object):
self.node_labels = []
self.node_name = None
self.nodeset = None
self.zuul_event_id = zuul_event_id
def __repr__(self):
return ('<Build %s of %s voting:%s on %s>' %

View File

@@ -201,7 +201,7 @@ class Nodepool(object):
self.log.debug("Removing autohold for %s", autohold_key)
del self.sched.autohold_requests[autohold_key]
def useNodeSet(self, nodeset, build_set=None):
def useNodeSet(self, nodeset, build_set=None, event=None):
self.log.info("Setting nodeset %s in use" % (nodeset,))
resources = defaultdict(int)
for node in nodeset.getNodes():

View File

@@ -1269,6 +1269,8 @@ class Scheduler(threading.Thread):
def _doBuildCompletedEvent(self, event):
build = event.build
zuul_event_id = build.zuul_event_id
log = get_annotated_logger(self.log, zuul_event_id)
# Regardless of any other conditions which might cause us not
# to pass this on to the pipeline manager, make sure we return
@@ -1276,27 +1278,25 @@ class Scheduler(threading.Thread):
try:
self._processAutohold(build)
except Exception:
self.log.exception("Unable to process autohold for %s" % build)
log.exception("Unable to process autohold for %s" % build)
try:
self.nodepool.returnNodeSet(build.nodeset, build)
except Exception:
self.log.exception("Unable to return nodeset %s" % build.nodeset)
log.exception("Unable to return nodeset %s" % build.nodeset)
if build.build_set is not build.build_set.item.current_build_set:
self.log.debug("Build %s is not in the current build set" %
(build,))
log.debug("Build %s is not in the current build set", build)
return
pipeline = build.build_set.item.pipeline
if not pipeline:
self.log.warning("Build %s is not associated with a pipeline" %
(build,))
log.warning("Build %s is not associated with a pipeline", build)
return
if build.end_time and build.start_time and build.result:
duration = build.end_time - build.start_time
try:
self.time_database.update(build, duration, build.result)
except Exception:
self.log.exception("Exception recording build time:")
log.exception("Exception recording build time:")
pipeline.manager.onBuildCompleted(event.build)
def _doMergeCompletedEvent(self, event):