Merge "Make time estimation synchronous"
This commit is contained in:
commit
26ceb3fee8
|
@ -13,63 +13,28 @@
|
|||
# under the License.
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import queue
|
||||
|
||||
import cachetools
|
||||
|
||||
|
||||
class Times:
|
||||
"""Perform asynchronous database queries to estimate build times.
|
||||
|
||||
To avoid allowing the SQL database to become a bottelneck when
|
||||
launching builds, this class performs asynchronous queries against
|
||||
the DB and returns estimated build times.
|
||||
|
||||
This is intended as a temporary hedge against performance
|
||||
regressions during Zuul v4 development and can likely be removed
|
||||
once multiple schedulers are supported and possible tightening of
|
||||
database requirements.
|
||||
"""
|
||||
"""Obtain estimated build times"""
|
||||
|
||||
log = logging.getLogger("zuul.times")
|
||||
|
||||
def __init__(self, sql, statsd):
|
||||
self.sql = sql
|
||||
self.statsd = statsd
|
||||
self.queue = queue.Queue()
|
||||
self.cache = cachetools.TTLCache(8192, 3600)
|
||||
self.thread = threading.Thread(target=self.run)
|
||||
self.running = False
|
||||
|
||||
def start(self):
|
||||
self.running = True
|
||||
self.thread.start()
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
self.running = False
|
||||
self.queue.put(None)
|
||||
pass
|
||||
|
||||
def join(self):
|
||||
return self.thread.join()
|
||||
|
||||
def run(self):
|
||||
while self.running:
|
||||
key = self.queue.get()
|
||||
if key is None:
|
||||
continue
|
||||
try:
|
||||
# Double check that we haven't added this key since it
|
||||
# was requested
|
||||
if key in self.cache:
|
||||
continue
|
||||
if self.statsd:
|
||||
with self.statsd.timer('zuul.scheduler.time_query'):
|
||||
self._getTime(key)
|
||||
else:
|
||||
self._getTime(key)
|
||||
except Exception:
|
||||
self.log.exception("Error querying DB for build %s", key)
|
||||
pass
|
||||
|
||||
def _getTime(self, key):
|
||||
tenant, project, branch, job = key
|
||||
|
@ -86,6 +51,8 @@ class Times:
|
|||
if times:
|
||||
estimate = float(sum(times)) / len(times)
|
||||
self.cache.setdefault(key, estimate)
|
||||
return estimate
|
||||
return None
|
||||
# Don't cache a zero value, so that new jobs get an estimated
|
||||
# time ASAP.
|
||||
|
||||
|
@ -95,5 +62,8 @@ class Times:
|
|||
if ret is not None:
|
||||
return ret
|
||||
|
||||
self.queue.put(key)
|
||||
return None
|
||||
if self.statsd:
|
||||
with self.statsd.timer('zuul.scheduler.time_query'):
|
||||
return self._getTime(key)
|
||||
else:
|
||||
return self._getTime(key)
|
||||
|
|
|
@ -757,14 +757,6 @@ class PipelineManager(metaclass=ABCMeta):
|
|||
else:
|
||||
relative_priority = 0
|
||||
for job in jobs:
|
||||
# Request an estimated time here in order to give the time
|
||||
# thread an opportunity to perform the SQL query in the
|
||||
# background if necessary.
|
||||
self.sched.times.getEstimatedTime(
|
||||
item.pipeline.tenant.name,
|
||||
item.change.project.name,
|
||||
getattr(item.change, 'branch', None),
|
||||
job.name)
|
||||
provider = self._getPausedParentProvider(build_set, job)
|
||||
priority = self._calculateNodeRequestPriority(build_set, job)
|
||||
tenant_name = build_set.item.pipeline.tenant.name
|
||||
|
|
Loading…
Reference in New Issue