Make time estimation synchronous
With two schedulers, we can no longer guarantee that the two-step time estimation query happens on the same scheduler. Remove the background thread and make the lookup synchronous. This may add some time to the main loop, but can be mitigated by multiple schedulers now. Change-Id: I86529a833658ce15c4e50323b77c8fd76d84486c
This commit is contained in:
parent
5777ebc0ee
commit
9f969862df
@ -13,63 +13,28 @@
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import queue
|
||||
|
||||
import cachetools
|
||||
|
||||
|
||||
class Times:
|
||||
"""Perform asynchronous database queries to estimate build times.
|
||||
|
||||
To avoid allowing the SQL database to become a bottelneck when
|
||||
launching builds, this class performs asynchronous queries against
|
||||
the DB and returns estimated build times.
|
||||
|
||||
This is intended as a temporary hedge against performance
|
||||
regressions during Zuul v4 development and can likely be removed
|
||||
once multiple schedulers are supported and possible tightening of
|
||||
database requirements.
|
||||
"""
|
||||
"""Obtain estimated build times"""
|
||||
|
||||
log = logging.getLogger("zuul.times")
|
||||
|
||||
def __init__(self, sql, statsd):
|
||||
self.sql = sql
|
||||
self.statsd = statsd
|
||||
self.queue = queue.Queue()
|
||||
self.cache = cachetools.TTLCache(8192, 3600)
|
||||
self.thread = threading.Thread(target=self.run)
|
||||
self.running = False
|
||||
|
||||
def start(self):
|
||||
self.running = True
|
||||
self.thread.start()
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
self.running = False
|
||||
self.queue.put(None)
|
||||
pass
|
||||
|
||||
def join(self):
|
||||
return self.thread.join()
|
||||
|
||||
def run(self):
|
||||
while self.running:
|
||||
key = self.queue.get()
|
||||
if key is None:
|
||||
continue
|
||||
try:
|
||||
# Double check that we haven't added this key since it
|
||||
# was requested
|
||||
if key in self.cache:
|
||||
continue
|
||||
if self.statsd:
|
||||
with self.statsd.timer('zuul.scheduler.time_query'):
|
||||
self._getTime(key)
|
||||
else:
|
||||
self._getTime(key)
|
||||
except Exception:
|
||||
self.log.exception("Error querying DB for build %s", key)
|
||||
pass
|
||||
|
||||
def _getTime(self, key):
|
||||
tenant, project, branch, job = key
|
||||
@ -86,6 +51,8 @@ class Times:
|
||||
if times:
|
||||
estimate = float(sum(times)) / len(times)
|
||||
self.cache.setdefault(key, estimate)
|
||||
return estimate
|
||||
return None
|
||||
# Don't cache a zero value, so that new jobs get an estimated
|
||||
# time ASAP.
|
||||
|
||||
@ -95,5 +62,8 @@ class Times:
|
||||
if ret is not None:
|
||||
return ret
|
||||
|
||||
self.queue.put(key)
|
||||
return None
|
||||
if self.statsd:
|
||||
with self.statsd.timer('zuul.scheduler.time_query'):
|
||||
return self._getTime(key)
|
||||
else:
|
||||
return self._getTime(key)
|
||||
|
@ -757,14 +757,6 @@ class PipelineManager(metaclass=ABCMeta):
|
||||
else:
|
||||
relative_priority = 0
|
||||
for job in jobs:
|
||||
# Request an estimated time here in order to give the time
|
||||
# thread an opportunity to perform the SQL query in the
|
||||
# background if necessary.
|
||||
self.sched.times.getEstimatedTime(
|
||||
item.pipeline.tenant.name,
|
||||
item.change.project.name,
|
||||
getattr(item.change, 'branch', None),
|
||||
job.name)
|
||||
provider = self._getPausedParentProvider(build_set, job)
|
||||
priority = self._calculateNodeRequestPriority(build_set, job)
|
||||
tenant_name = build_set.item.pipeline.tenant.name
|
||||
|
Loading…
x
Reference in New Issue
Block a user