Try harder to unlock failed build requests

An OpenDev executor lost the ZK connection while trying to start
a build, specifically at the stage of reading the params from ZK.
In this case, it was also unable to unlock the build request
after the initial exception was raised.  The ZK connection
was resumed without losing the lock, which means that the build
request stayed in running+locked, so the cleanup method leaves
it alone.  There is no recovery path from this situation.

To correct this, we will try indefinitely to unlock a build request
after we are no longer working on it.  Further, we will also try
indefinitely to report the result to Zuul.  There is still a narrow
race condition noted inline, but this change should be a substantial
improvement until we can address that.

Also, fix a race that could run merge jobs twice and break their result

There is a race condition in the merger run loop that allows a merge job
to be run twice whereby the second run breaks the result because the job
parameters where deleted during the first run.

This can occur because the merger run loop is operating on cached data.
It could be that a merge request is taken into account because it's
unlocked but was already completed in a previous run.

To avoid running the request a second time, the lock() method now
updates the local request object with the current data from ZooKeeper
and the merger checks the request's state again after locking it.

This change also fixes the executor run loop as this one is using the
same methods. Although we've never seen this issue there it might be
hidden by some other circumstances as the executor API differs in some
aspects from the merger API (e.g. dealing with node requests and node
locking, no synchronous results).

Change-Id: I167c0ceb757e50403532ece88a534c4412d11365
Co-Authored-By: Felix Edel <felix.edel@bmw.de>
This commit is contained in:
James E. Blair 2021-09-02 16:07:29 -07:00
parent 0d9c3beace
commit 6fcde31c9e
7 changed files with 293 additions and 67 deletions

View File

@ -1,5 +1,6 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2014 Wikimedia Foundation Inc.
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -18,6 +19,7 @@ import logging
import configparser
import multiprocessing
import os
import re
import time
from unittest import mock
@ -1060,3 +1062,18 @@ class TestVarSquash(BaseTestCase):
'extra': 'extravar_extra'},
}
self.assertEqual(out, expected)
class TestExecutorFailure(ZuulTestCase):
tenant_config_file = 'config/single-tenant/main.yaml'
@mock.patch('zuul.executor.server.ExecutorServer.executeJob')
def test_executor_job_start_failure(self, execute_job_mock):
execute_job_mock.side_effect = Exception('Failed to start')
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
self.assertTrue(re.search(
'- project-merge .* ERROR',
A.messages[-1]))

View File

@ -1039,8 +1039,8 @@ class TestMergerApi(ZooKeeperBaseTestCase):
b.state = MergeRequest.RUNNING
merger_api.update(b)
c.state = MergeRequest.RUNNING
merger_api.lock(c)
c.state = MergeRequest.RUNNING
merger_api.update(c)
d.state = MergeRequest.COMPLETED

View File

@ -1,4 +1,5 @@
# Copyright 2014 OpenStack Foundation
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -31,6 +32,7 @@ import time
import traceback
from concurrent.futures.process import ProcessPoolExecutor, BrokenProcessPool
from kazoo.exceptions import NoNodeError
import git
from urllib.parse import urlsplit
@ -3543,16 +3545,39 @@ class ExecutorServer(BaseMergeServer):
log = get_annotated_logger(
self.log, event=None, build=build_request.uuid
)
# Lock and update the build request
if not self.executor_api.lock(build_request, blocking=False):
return
# Ensure that the request is still in state requested. This method is
# called based on cached data and there might be a mismatch between the
# cached state and the real state of the request. The lock might
# have been successful because the request is already completed and
# thus unlocked.
if build_request.state != BuildRequest.REQUESTED:
self._retry(build_request.lock, log, self.executor_api.unlock,
build_request)
try:
build_request.state = BuildRequest.RUNNING
params = self.executor_api.getParams(build_request)
self.executor_api.clearParams(build_request)
# Directly update the build in ZooKeeper, so we don't
# loop over and try to lock it again and again.
# Directly update the build in ZooKeeper, so we don't loop
# over and try to lock it again and again. Do this before
# clearing the params so if we fail, no one tries to
# re-run the job.
build_request.state = BuildRequest.RUNNING
self.executor_api.update(build_request)
except Exception:
log.exception("Exception while preparing to start worker")
# If we failed at this point, we have not written anything
# to ZK yet; the only thing we need to do is to ensure
# that we release the lock, and another executor will be
# able to grab the build.
self._retry(build_request.lock, log, self.executor_api.unlock,
build_request)
return
try:
self.executor_api.clearParams(build_request)
log.debug("Next executed job: %s", build_request)
self.executeJob(build_request, params)
except Exception:
@ -3560,7 +3585,7 @@ class ExecutorServer(BaseMergeServer):
# sucessfuly start executing the job, it's the
# AnsibleJob's responsibility to call completeBuild and
# unlock the request.
log.exception("Exception while running job")
log.exception("Exception while starting worker")
result = {
"result": "ERROR",
"exception": traceback.format_exc(),
@ -3832,46 +3857,95 @@ class ExecutorServer(BaseMergeServer):
# result dict for that.
result["end_time"] = time.time()
# NOTE (felix): We store the end_time on the ansible job to calculate
# the in-use duration of locked nodes when the nodeset is returned.
ansible_job = self.job_workers[build_request.uuid]
ansible_job.end_time = time.monotonic()
params = ansible_job.arguments
# If the result is None, check if the build has reached its max
# attempts and if so set the result to RETRY_LIMIT.
# This must be done in order to correctly process the autohold in the
# next step. Since we only want to hold the node if the build has
# reached a final result.
if result.get("result") is None:
attempts = params["zuul"]["attempts"]
max_attempts = params["max_attempts"]
if attempts >= max_attempts:
result["result"] = "RETRY_LIMIT"
zuul_event_id = params["zuul_event_id"]
log = get_annotated_logger(self.log, zuul_event_id,
log = get_annotated_logger(self.log, build_request.event_id,
build=build_request.uuid)
# Provide the hold information back to the scheduler via the build
# result.
try:
held = self._processAutohold(ansible_job, result.get("result"))
result["held"] = held
log.info("Held status set to %s", held)
except Exception:
log.exception("Unable to process autohold for %s", ansible_job)
# NOTE (felix): We store the end_time on the ansible job to calculate
# the in-use duration of locked nodes when the nodeset is returned.
# NOTE: this method may be called before we create a job worker.
ansible_job = self.job_workers.get(build_request.uuid)
if ansible_job:
ansible_job.end_time = time.monotonic()
params = ansible_job.arguments
# If the result is None, check if the build has reached
# its max attempts and if so set the result to
# RETRY_LIMIT. This must be done in order to correctly
# process the autohold in the next step. Since we only
# want to hold the node if the build has reached a final
# result.
if result.get("result") is None:
attempts = params["zuul"]["attempts"]
max_attempts = params["max_attempts"]
if attempts >= max_attempts:
result["result"] = "RETRY_LIMIT"
# Provide the hold information back to the scheduler via the build
# result.
try:
held = self._processAutohold(ansible_job, result.get("result"))
result["held"] = held
log.info("Held status set to %s", held)
except Exception:
log.exception("Unable to process autohold for %s", ansible_job)
def update_build_request(log, build_request):
try:
self.executor_api.update(build_request)
return True
except JobRequestNotFound as e:
log.warning("Could not find build: %s", str(e))
return False
def put_complete_event(log, build_request, event):
try:
self.result_events[build_request.tenant_name][
build_request.pipeline_name].put(event)
except NoNodeError:
log.warning("Pipeline was removed: %s",
build_request.pipeline_name)
build_request.state = BuildRequest.COMPLETED
try:
self.executor_api.update(build_request)
except JobRequestNotFound as e:
self.log.warning("Could not complete build: %s", str(e))
found = self._retry(build_request.lock, log,
update_build_request, log, build_request)
lock_valid = build_request.lock.is_still_valid()
if lock_valid:
# We only need to unlock if we're still locked.
self._retry(build_request.lock, log, self.executor_api.unlock,
build_request)
if not found:
# If the build request is gone, don't return a result.
return
# Unlock the build request
self.executor_api.unlock(build_request)
if not lock_valid:
# If we lost the lock at any point before updating the
# state to COMPLETED, then the scheduler may have (or
# will) detect it as an incomplete build and generate an
# error event for us. We don't need to submit a
# completion event in that case.
#
# TODO: If we make the scheduler robust against receiving
# duplicate completion events for the same build, we could
# choose continue here and submit the completion event in
# the hopes that we would win the race against the cleanup
# thread. That might (in some narrow circumstances)
# rescue an otherwise acceptable build from being
# discarded.
return
# TODO: This is racy. Once we have set the build request to
# completed, the only way for it to be deleted is for the
# scheduler to process a BuildRequestCompleted event. So we
# need to try really hard to give it one. But if we exit
# between the section above and the section below, we won't,
# which will mean that the scheduler will not automatically
# delete the build request and we will not be able to recover.
#
# This is essentially a two-phase commit problem, but we are
# unable to use transactions because the result event is
# sharded. We should be able to redesign the result reporting
# mechanism to eliminate the race and be more convergent.
event = BuildCompletedEvent(build_request.uuid, result)
self.result_events[build_request.tenant_name][
build_request.pipeline_name].put(event)
self._retry(None, log, put_complete_event, log,
build_request, event)

View File

@ -22,6 +22,8 @@ import time
from abc import ABCMeta
from configparser import ConfigParser
from kazoo.exceptions import NoNodeError
from zuul.lib import commandsocket
from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger
@ -204,14 +206,34 @@ class BaseMergeServer(metaclass=ABCMeta):
log = get_annotated_logger(
self.log, merge_request.event_id
)
# Lock and update the merge request
if not self.merger_api.lock(merge_request, blocking=False):
return
# Ensure that the request is still in state requested. This method is
# called based on cached data and there might be a mismatch between the
# cached state and the real state of the request. The lock might
# have been successful because the request is already completed and
# thus unlocked.
if merge_request.state != MergeRequest.REQUESTED:
self._retry(merge_request.lock, log, self.merger_api.unlock,
merge_request)
try:
merge_request.state = MergeRequest.RUNNING
params = self.merger_api.getParams(merge_request)
except Exception:
log.exception("Exception while preparing to start merge job")
# If we failed at this point, we have not written anything
# to ZK yet; the only thing we need to do is to ensure
# that we release the lock, and another merger will be
# able to grab the request.
self._retry(merge_request.lock, log, self.merger_api.unlock,
merge_request)
return
result = None
try:
merge_request.state = MergeRequest.RUNNING
params = self.merger_api.getParams(merge_request)
self.merger_api.clearParams(merge_request)
# Directly update the merge request in ZooKeeper, so we
# don't loop over and try to lock it again and again.
@ -344,13 +366,18 @@ class BaseMergeServer(metaclass=ABCMeta):
item_in_branches,
)
lock_valid = merge_request.lock.is_still_valid()
if not lock_valid:
return
# Provide a result either via a result future or a result event
if merge_request.result_path:
log.debug(
"Providing synchronous result via future for %s",
merge_request,
)
self.merger_api.reportResult(merge_request, result)
self._retry(merge_request.lock, log,
self.merger_api.reportResult, merge_request, result)
elif merge_request.build_set_uuid:
log.debug(
@ -373,10 +400,16 @@ class BaseMergeServer(metaclass=ABCMeta):
item_in_branches,
)
tenant_name = merge_request.tenant_name
pipeline_name = merge_request.pipeline_name
def put_complete_event(log, merge_request, event):
try:
self.result_events[merge_request.tenant_name][
merge_request.pipeline_name].put(event)
except NoNodeError:
log.warning("Pipeline was removed: %s",
merge_request.pipeline_name)
self.result_events[tenant_name][pipeline_name].put(event)
self._retry(merge_request.lock, log,
put_complete_event, log, merge_request, event)
# Set the merge request to completed, unlock and delete it. Although
# the state update is mainly for consistency reasons, it might come in
@ -384,12 +417,43 @@ class BaseMergeServer(metaclass=ABCMeta):
# the merge request was already processed and we have a result in the
# result queue.
merge_request.state = MergeRequest.COMPLETED
self.merger_api.update(merge_request)
self.merger_api.unlock(merge_request)
self._retry(merge_request.lock, log,
self.merger_api.update, merge_request)
self._retry(merge_request.lock, log,
self.merger_api.unlock, merge_request)
# TODO (felix): If we want to optimize ZK requests, we could only call
# the remove() here.
self.merger_api.remove(merge_request)
def _retry(self, lock, log, fn, *args, **kw):
"""Retry a method to deal with ZK connection issues
This is a helper method to retry ZK operations as long as it
makes sense to do so. If we have encountered a suspended
connection, we can probably just retry the ZK operation until
it succeeds. If we have fully lost the connection, then we
have lost the lock, so we may not care in that case.
This method exits when one of the following occurs:
* The callable function completes.
* This server stops.
* The lock (if supplied) is invalidated due to connection loss.
Pass None as the lock parameter if the lock issue is not
relevant.
"""
while True:
if lock and not lock.is_still_valid():
return
try:
return fn(*args, **kw)
except Exception:
log.exception("Exception retrying %s", fn)
if not self._running:
return
time.sleep(5)
class MergeServer(BaseMergeServer):
log = logging.getLogger("zuul.MergeServer")

View File

@ -2112,6 +2112,11 @@ class JobRequest:
"result_path": self.result_path,
}
def updateFromDict(self, data):
self.precedence = data["precedence"]
self.state = data["state"]
self.result_path = data["result_path"]
@classmethod
def fromDict(cls, data):
return cls(

View File

@ -21,7 +21,6 @@ from enum import Enum
from kazoo.exceptions import LockTimeout, NoNodeError
from kazoo.protocol.states import EventType
from kazoo.recipe.lock import Lock
from zuul.lib.jsonutil import json_dumps
from zuul.lib.logutil import get_annotated_logger
@ -30,6 +29,7 @@ from zuul.zk import ZooKeeperSimpleBase, sharding
from zuul.zk.event_queues import JobResultFuture
from zuul.zk.exceptions import JobRequestNotFound
from zuul.zk.vendor.watchers import ExistingDataWatch
from zuul.zk.locks import SessionAwareLock
class JobRequestEvent(Enum):
@ -292,6 +292,23 @@ class JobRequestQueue(ZooKeeperSimpleBase):
return request
def refresh(self, request):
"""Refreshs a request object with the current data from ZooKeeper. """
try:
data, zstat = self.kazoo_client.get(request.path)
except NoNodeError:
raise JobRequestNotFound(
f"Could not refresh {request}, ZooKeeper node is missing")
if not data:
raise JobRequestNotFound(
f"Could not refresh {request}, ZooKeeper node is empty")
content = self._bytesToDict(data)
request.updateFromDict(content)
request._zstat = zstat
def remove(self, request):
self.log.debug("Removing request %s", request)
try:
@ -340,7 +357,7 @@ class JobRequestQueue(ZooKeeperSimpleBase):
have_lock = False
lock = None
try:
lock = Lock(self.kazoo_client, path)
lock = SessionAwareLock(self.kazoo_client, path)
have_lock = lock.acquire(blocking, timeout)
except LockTimeout:
have_lock = False
@ -354,20 +371,14 @@ class JobRequestQueue(ZooKeeperSimpleBase):
return False
if not self.kazoo_client.exists(request.path):
lock.release()
self.log.error(
"Request not found for locking: %s", request.uuid
)
# We may have just re-created the lock parent node just after the
# scheduler deleted it; therefore we should (re-) delete it.
try:
# Delete the lock parent node as well.
path = "/".join([self.LOCK_ROOT, request.uuid])
self.kazoo_client.delete(path, recursive=True)
except NoNodeError:
pass
self._releaseLock(request, lock)
return False
# Update the request to ensure that we operate on the newest data.
try:
self.refresh(request)
except JobRequestNotFound:
self._releaseLock(request, lock)
return False
request.lock = lock
@ -380,6 +391,24 @@ class JobRequestQueue(ZooKeeperSimpleBase):
return True
def _releaseLock(self, request, lock):
"""Releases a lock.
This is used directly after acquiring the lock in case something went
wrong.
"""
lock.release()
self.log.error("Request not found for locking: %s", request.uuid)
# We may have just re-created the lock parent node just after the
# scheduler deleted it; therefore we should (re-) delete it.
try:
# Delete the lock parent node as well.
path = "/".join([self.LOCK_ROOT, request.uuid])
self.kazoo_client.delete(path, recursive=True)
except NoNodeError:
pass
def unlock(self, request):
if request.lock is None:
self.log.warning(
@ -391,7 +420,7 @@ class JobRequestQueue(ZooKeeperSimpleBase):
def isLocked(self, request):
path = "/".join([self.LOCK_ROOT, request.uuid])
lock = Lock(self.kazoo_client, path)
lock = SessionAwareLock(self.kazoo_client, path)
is_locked = len(lock.contenders()) > 0
return is_locked

View File

@ -16,13 +16,50 @@ import logging
from contextlib import contextmanager
from urllib.parse import quote_plus
from kazoo.protocol.states import KazooState
from zuul.zk.exceptions import LockException
from zuul.zk.vendor.lock import ReadLock, WriteLock
from zuul.zk.vendor.lock import Lock, ReadLock, WriteLock
LOCK_ROOT = "/zuul/locks"
TENANT_LOCK_ROOT = f"{LOCK_ROOT}/tenant"
class SessionAwareLock(Lock):
def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
self._zuul_ephemeral = None
self._zuul_session_expired = False
self._zuul_watching_session = False
super().__init__(client, path, identifier, extra_lock_patterns)
def acquire(self, blocking=True, timeout=None, ephemeral=True):
ret = super().acquire(blocking, timeout, ephemeral)
self._zuul_session_expired = False
if ret and ephemeral:
self._zuul_ephemeral = ephemeral
self.client.add_listener(self._zuul_session_watcher)
self._zuul_watching_session = True
return ret
def release(self):
if self._zuul_watching_session:
self.client.remove_listener(self._zuul_session_watcher)
self._zuul_watching_session = False
return super().release()
def _zuul_session_watcher(self, state):
if state == KazooState.LOST:
self._zuul_session_expired = True
# Return true to de-register
return True
def is_still_valid(self):
if not self._zuul_ephemeral:
return True
return not self._zuul_session_expired
@contextmanager
def locked(lock, blocking=True, timeout=None):
if not lock.acquire(blocking=blocking, timeout=timeout):