Look up worker_zone for log streaming from executor

Currently, we are looking up the worker_zone for the log streaming from
the BuildRequest's path in ZooKeeper. This is a problem for unzoned
builds as those builds don't provide a zone information in their path
(zone=None).

Due to this, the log streaming won't use the FingerGateways and instead
always falls back to use the direct connection to the executor. This
works as long as the executor is located in the same region as zuul-web,
but in other cases the log streaming is broken.

To fix this, the executor will now store its zone information in the
worker_info of the BuildRequest when accepting the BuildRequest. In
the streamer_utils library we will use this zone information instead of
the zone from the ZooKeeper path.

Co-Authored-By: Simon Westphahl <simon.westphahl@bmw.de>
Change-Id: I63b148fa29e05157fce032d0f41b909da8a11e87
This commit is contained in:
Felix Edel 2022-02-21 13:02:44 +01:00 committed by Tobias Henkel
parent d5385d7fc5
commit 8db6b6113a
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
4 changed files with 22 additions and 1 deletions

View File

@ -802,3 +802,15 @@ class TestStreamingZones(TestStreamingBase):
class TestStreamingZonesSSL(TestStreamingZones):
fingergw_use_ssl = True
class TestStreamingUnzonedJob(TestStreamingZones):
def setUp(self):
super().setUp()
self.fake_nodepool.attributes = None
def setup_config(self, config_file: str):
config = super().setup_config(config_file)
config.set('executor', 'allow_unzoned', 'true')
return config

View File

@ -3726,6 +3726,7 @@ class ExecutorServer(BaseMergeServer):
build_request.worker_info = {
"hostname": self.hostname,
"log_port": self.log_streaming_port,
"zone": self.zone,
}
self.executor_api.update(build_request)
except Exception:

View File

@ -188,7 +188,10 @@ def getJobLogStreamAddress(executor_api, uuid, source_zone):
# Search for the build request in ZooKeeper. This iterates over all
# available zones (inlcuding unzoned) and stops when the UUID is
# found.
build_request, worker_zone = executor_api.getByUuid(uuid)
# TODO (felix): Remove the zk_worker_zone return value after a deprecation
# period. This is kept for backwards-compatibility until all executors
# store their zone information in the worker_info dictionary.
build_request, zk_worker_zone = executor_api.getByUuid(uuid)
if build_request is None:
raise StreamingError("Build not found")
@ -197,6 +200,7 @@ def getJobLogStreamAddress(executor_api, uuid, source_zone):
if not worker_info:
raise StreamingError("Build did not start yet")
worker_zone = worker_info.get("zone", zk_worker_zone)
job_log_stream_address = {}
if worker_zone and source_zone != worker_zone:
info = _getFingerGatewayInZone(worker_zone)

View File

@ -150,6 +150,10 @@ class ExecutorApi:
for zone in self._getAllZones():
request = self.zone_queues[zone].getByUuid(uuid)
if request:
# TODO (felix): Remove the zone return value after a
# deprecation period. This is kept for backwards compatibility
# until all executors store their zone information in the
# worker_info dictionary on the BuildRequest.
return request, zone
return None, None