Use a transaction for BuildCompletedEvent

There is a race condition where an executor may crash and leave
a stuck build.  We can avoid that by performing the following two
actions in a transaction:

* Update the build request state to COMPLETED
* Submit the BuildCompletedEvent to the event queue

The race condition occurs when the build request is marked as completed
but no BuildCompletedEvent arrives.  In that case, Zuul sees the
completed build request and assumes that the event will be forthcoming;
therefore the build request itself is not considered lost.  The only way
for a build request to be removed in that case is in the case of a
buildset reset.

By including these operations in a transaction, only the following
states are possible if the executor crashes:

* It crashes before the build is complete: the build is declared lost
  and restarted.
* It crashes after the build is complete: the scheduler doesn't care.

Transactions are limited to 1MB just like any other ZK network operation,
and the result data can be large, but we already put that in a side-channel
if it exceeds a certain size, so only the actual event znode and request
znode need to be involved in the transaction.

Change-Id: Ibedf2c5db825fb444f652b60e1c6f2c7aadc6950
This commit is contained in:
James E. Blair 2022-02-23 18:49:22 -08:00
parent 7efc4f2533
commit 61822ec737
5 changed files with 157 additions and 73 deletions

View File

@ -33,6 +33,8 @@ import traceback
from concurrent.futures.process import ProcessPoolExecutor, BrokenProcessPool
from kazoo.exceptions import NoNodeError
from kazoo.retry import KazooRetry
import git
from urllib.parse import urlsplit
@ -4061,35 +4063,7 @@ class ExecutorServer(BaseMergeServer):
log.exception("Unable to process autohold for %s",
build_request)
def update_build_request(log, build_request):
try:
self.executor_api.update(build_request)
return True
except JobRequestNotFound as e:
log.warning("Could not find build: %s", str(e))
return False
def put_complete_event(log, build_request, event):
try:
self.result_events[build_request.tenant_name][
build_request.pipeline_name].put(event)
except NoNodeError:
log.warning("Pipeline was removed: %s",
build_request.pipeline_name)
build_request.state = BuildRequest.COMPLETED
found = self._retry(build_request.lock, log,
update_build_request, log, build_request)
lock_valid = build_request.lock.is_still_valid()
if lock_valid:
# We only need to unlock if we're still locked.
self._retry(build_request.lock, log, self.executor_api.unlock,
build_request)
if not found:
# If the build request is gone, don't return a result.
return
lock_valid = self.zkRetry(log, build_request.lock.is_still_valid)
if not lock_valid:
# If we lost the lock at any point before updating the
# state to COMPLETED, then the scheduler may have (or
@ -4106,21 +4080,45 @@ class ExecutorServer(BaseMergeServer):
# discarded.
return
# TODO: This is racy. Once we have set the build request to
# completed, the only way for it to be deleted is for the
# scheduler to process a BuildRequestCompleted event. So we
# need to try really hard to give it one. But if we exit
# between the section above and the section below, we won't,
# which will mean that the scheduler will not automatically
# delete the build request and we will not be able to recover.
#
# This is essentially a two-phase commit problem, but we are
# unable to use transactions because the result event is
# sharded. We should be able to redesign the result reporting
# mechanism to eliminate the race and be more convergent.
updater = self.executor_api.getRequestUpdater(build_request)
event = BuildCompletedEvent(
build_request.uuid, build_request.build_set_uuid,
build_request.job_name, build_request.path, result,
build_request.event_id)
self._retry(None, log, put_complete_event, log,
build_request, event)
build_request.state = BuildRequest.COMPLETED
updated = False
put_method = self.result_events[build_request.tenant_name][
build_request.pipeline_name].put
try:
self.zkRetry(log, put_method, event, updater=updater)
updated = True
except JobRequestNotFound as e:
log.warning("Could not find build: %s", str(e))
return
except NoNodeError:
log.warning("Pipeline was removed: %s",
build_request.pipeline_name)
if not updated:
# If the pipeline was removed but the request remains, we
# should still update the build request just in case, in
# order to prevent another executor from starting an
# unecessary build.
self.zkRetry(log, self.executor_api.update, build_request)
self.zkRetry(log, self.executor_api.unlock, build_request)
def zkRetry(self, log, func, *args, **kw):
# This retries func after retryable exceptions from ZK, but
# only as long as we're still running.
# TODO: Replace MergerServer._retry with a version of this
# method.
def interrupt():
return not self._running
kazoo_retry = KazooRetry(max_tries=-1, interrupt=interrupt,
delay=5, backoff=0, ignore_expire=False)
try:
return kazoo_retry(func, *args, **kw)
except InterruptedError:
pass

View File

@ -3171,6 +3171,8 @@ class JobRequest:
ALL_STATES = (UNSUBMITTED, REQUESTED, HOLD, RUNNING, COMPLETED)
# This object participates in transactions, and therefore must
# remain small and unsharded.
def __init__(self, uuid, precedence=None, state=None, result_path=None):
self.uuid = uuid
if precedence is None:

View File

@ -225,7 +225,7 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
if not self._remove(event.ack_ref.path, event.ack_ref.version):
self.log.warning("Event %s was already acknowledged", event)
def _put(self, data):
def _put(self, data, updater=None):
# Event data can be large, so we want to shard it. But events
# also need to be atomic (we don't want an event listener to
# start processing a half-stored event). A natural solution
@ -247,13 +247,30 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
# contains the bulk of the data. We extract it here into the
# side channel data, then in _iterEvents we re-constitute it
# into the dictionary.
#
# If `updater` is supplied, it is a RequestUpdater instance, and
# we are engaged in a cooperative transaction with a job
# request.
# Add a trailing / for our sequence nodes
event_path = f"{self.event_root}/"
# Add a trailing /q for our sequence nodes; transaction
# sequence nodes need a character after / otherwise they drop
# the /.
event_path = f"{self.event_root}/q"
if updater and not updater.preRun():
# Don't even try the update
return
# Write the side channel data here under the assumption that
# the transaction will proceed. If the transaction fails, it
# will be cleaned up after about 5 minutes.
side_channel_data = None
size_limit = sharding.NODE_BYTE_SIZE_LIMIT
if updater:
# If we are in a transaction, leave enough room to share.
size_limit /= 2
encoded_data = json.dumps(data, sort_keys=True).encode("utf-8")
if (len(encoded_data) > sharding.NODE_BYTE_SIZE_LIMIT
if (len(encoded_data) > size_limit
and 'event_data' in data):
# Get a unique data node
data_id = str(uuid.uuid4())
@ -269,8 +286,27 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
self.kazoo_client, data_root) as stream:
stream.write(side_channel_data)
return self.kazoo_client.create(
event_path, encoded_data, sequence=True)
if updater is None:
return self.kazoo_client.create(
event_path, encoded_data, sequence=True)
# The rest of the method is the updater case. Start a
# transaction.
transaction = self.kazoo_client.transaction()
# Add transaction tasks.
transaction.create(event_path, encoded_data, sequence=True)
updater.run(transaction)
# Commit the transaction and process the results. `results`
# is an array of either exceptions or return values
# corresponding to the operations in order.
result = transaction.commit()
if isinstance(result[0], Exception):
raise result[0]
updater.postRun(result[1])
return result[0]
def _iterEvents(self):
try:
@ -691,12 +727,12 @@ class PipelineResultEventQueue(ZooKeeperEventQueue):
def _createRegistry(cls, client, tenant_name):
return DefaultKeyDict(lambda p: cls(client, tenant_name, p))
def put(self, event):
def put(self, event, updater=None):
data = {
"event_type": type(event).__name__,
"event_data": event.toDict(),
}
self._put(data)
self._put(data, updater)
def __iter__(self):
for data, ack_ref, _ in self._iterEvents():

View File

@ -127,6 +127,9 @@ class ExecutorApi:
def submit(self, request, params):
return self.zone_queues[request.zone].submit(request, params)
def getRequestUpdater(self, request):
return self.zone_queues[request.zone].getRequestUpdater(request)
def update(self, request):
return self.zone_queues[request.zone].update(request)

View File

@ -20,7 +20,8 @@ from contextlib import suppress
from enum import Enum
from kazoo.exceptions import LockTimeout, NoNodeError
from kazoo.protocol.states import EventType
from kazoo.protocol.states import EventType, ZnodeStat
from kazoo.client import TransactionRequest
from zuul.lib.jsonutil import json_dumps
from zuul.lib.logutil import get_annotated_logger
@ -40,6 +41,60 @@ class JobRequestEvent(Enum):
DELETED = 4
class RequestUpdater:
"""This class cooperates with the event queues so that we can update a
request and submit an event in a single transaction."""
_log = logging.getLogger("zuul.JobRequestQueue")
def __init__(self, request):
self.request = request
self.log = get_annotated_logger(
self._log, event=request.event_id, build=request.uuid
)
def preRun(self):
"""A pre-flight check. Return whether we should attempt the
transaction."""
self.log.debug("Updating request %s", self.request)
if self.request._zstat is None:
self.log.debug(
"Cannot update request %s: Missing version information.",
self.request.uuid,
)
return False
return True
def run(self, client):
"""Actually perform the transaction. The 'client' argument may be a
transaction or a plain client."""
if isinstance(client, TransactionRequest):
setter = client.set_data
else:
setter = client.set
return setter(
self.request.path,
JobRequestQueue._dictToBytes(self.request.toDict()),
version=self.request._zstat.version,
)
def postRun(self, result):
"""Process the results of the transaction."""
try:
if isinstance(result, Exception):
raise result
elif isinstance(result, ZnodeStat):
self.request._zstat = result
else:
raise Exception("Unknown result from ZooKeeper for %s: %s",
self.request, result)
except NoNodeError:
raise JobRequestNotFound(
f"Could not update {self.request.path}"
)
class JobRequestQueue(ZooKeeperSimpleBase):
log = logging.getLogger("zuul.JobRequestQueue")
request_class = JobRequest
@ -233,30 +288,20 @@ class JobRequestQueue(ZooKeeperSimpleBase):
return result
def update(self, request):
log = get_annotated_logger(
self.log, event=request.event_id, build=request.uuid
)
log.debug("Updating request %s", request)
def getRequestUpdater(self, request):
return RequestUpdater(request)
if request._zstat is None:
log.debug(
"Cannot update request %s: Missing version information.",
request.uuid,
)
def update(self, request):
updater = self.getRequestUpdater(request)
if not updater.preRun():
return
try:
zstat = self.kazoo_client.set(
request.path,
self._dictToBytes(request.toDict()),
version=request._zstat.version,
)
# Update the zstat on the item after updating the ZK node
request._zstat = zstat
except NoNodeError:
raise JobRequestNotFound(
f"Could not update {request.path}"
)
result = updater.run(self.kazoo_client)
except Exception as e:
result = e
updater.postRun(result)
def reportResult(self, request, result):
# Write the result data first since it may be multiple nodes.