Report pipeline stats.
* Number of changes processed. * Time each change spent in pipeline. * Current number of changes in pipeline. All of those per-pipeline, and per-pipeline, per-project. Change-Id: I609ff33ab7c26fc43d323f8a0bbb61f93382af90 Reviewed-on: https://review.openstack.org/19764 Reviewed-by: Clark Boylan <clark.boylan@gmail.com> Approved: James E. Blair <corvus@inaugust.com> Tested-by: Jenkins
This commit is contained in:
parent
cb46234854
commit
8fa1697ea9
|
@ -306,6 +306,7 @@ class ChangeQueue(object):
|
|||
change.change_ahead = self.queue[-1]
|
||||
change.change_ahead.change_behind = change
|
||||
self.queue.append(change)
|
||||
change.enqueue_time = time.time()
|
||||
|
||||
def dequeueChange(self, change):
|
||||
if change in self.queue:
|
||||
|
@ -318,6 +319,7 @@ class ChangeQueue(object):
|
|||
change.change_behind.change_ahead = change.change_ahead
|
||||
change.change_ahead = None
|
||||
change.change_behind = None
|
||||
change.dequeue_time = time.time()
|
||||
|
||||
def addSeveredHead(self, change):
|
||||
self.severed_heads.append(change)
|
||||
|
@ -495,6 +497,8 @@ class Changeish(object):
|
|||
self.build_sets.append(self.current_build_set)
|
||||
self.change_ahead = None
|
||||
self.change_behind = None
|
||||
self.enqueue_time = None
|
||||
self.dequeue_time = None
|
||||
|
||||
def equals(self, other):
|
||||
raise NotImplementedError()
|
||||
|
|
|
@ -598,6 +598,7 @@ class BasePipelineManager(object):
|
|||
if self.start_action:
|
||||
self.reportStart(change)
|
||||
change_queue.enqueueChange(change)
|
||||
self.reportStats(change)
|
||||
self.enqueueChangesBehind(change)
|
||||
else:
|
||||
self.log.error("Unable to find change queue for project %s" %
|
||||
|
@ -730,6 +731,7 @@ class BasePipelineManager(object):
|
|||
change)
|
||||
change_queue = self.pipeline.getQueue(change.project)
|
||||
change_queue.dequeueChange(change)
|
||||
self.reportStats(change)
|
||||
return True
|
||||
|
||||
def reportChange(self, change):
|
||||
|
@ -910,6 +912,37 @@ class BasePipelineManager(object):
|
|||
ret = ret.format(**locals())
|
||||
return ret
|
||||
|
||||
def reportStats(self, change):
|
||||
if not statsd:
|
||||
return
|
||||
try:
|
||||
# Update the guage on enqueue and dequeue, but timers only
|
||||
# when dequeing.
|
||||
if change.dequeue_time:
|
||||
dt = int((change.dequeue_time - change.enqueue_time) * 1000)
|
||||
else:
|
||||
dt = None
|
||||
changes = len(self.pipeline.getAllChanges())
|
||||
|
||||
# stats.timers.zuul.pipeline.NAME.resident_time
|
||||
# stats_counts.zuul.pipeline.NAME.total_changes
|
||||
# stats.gauges.zuul.pipeline.NAME.current_changes
|
||||
key = 'zuul.pipeline.%s' % self.pipeline.name
|
||||
statsd.gauge(key + '.current_changes', changes)
|
||||
if dt:
|
||||
statsd.timing(key + '.resident_time', dt)
|
||||
statsd.incr(key + '.total_changes')
|
||||
|
||||
# stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
|
||||
# stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
|
||||
project_name = change.project.name.replace('/', '.')
|
||||
key += '.%s' % project_name
|
||||
if dt:
|
||||
statsd.timing(key + '.resident_time', dt)
|
||||
statsd.incr(key + '.total_changes')
|
||||
except:
|
||||
self.log.exception("Exception reporting pipeline stats")
|
||||
|
||||
|
||||
class IndependentPipelineManager(BasePipelineManager):
|
||||
log = logging.getLogger("zuul.IndependentPipelineManager")
|
||||
|
|
Loading…
Reference in New Issue