server: make stats more efficient

Keep track of the three main queue stats we report, so that we don't
have to iterate over all the queued jobs to determine whether they
are running.

Also, drop the workers stat because it's not very useful and not
entirely trivial to calculate.

Change-Id: Id42a05e5626096d1100a9cb9e8166c8ec5103b41
This commit is contained in:
James E. Blair 2017-10-30 11:46:08 -07:00
parent c4a5decdc6
commit 49d4bef593
1 changed files with 15 additions and 26 deletions

View File

@ -2714,6 +2714,9 @@ class Server(BaseClientServer):
self.normal_queue = [] self.normal_queue = []
self.low_queue = [] self.low_queue = []
self.jobs = {} self.jobs = {}
self.running_jobs = 0
self.waiting_jobs = 0
self.total_jobs = 0
self.functions = set() self.functions = set()
self.max_handle = 0 self.max_handle = 0
self.acl = acl self.acl = acl
@ -3039,6 +3042,11 @@ class Server(BaseClientServer):
# A specific queue was supplied # A specific queue was supplied
dequeue.remove(job) dequeue.remove(job)
# If dequeue is false, no need to remove from any queue # If dequeue is false, no need to remove from any queue
self.total_jobs -= 1
if job.running:
self.running_jobs -= 1
else:
self.waiting_jobs -= 1
def getQueue(self): def getQueue(self):
"""Returns a copy of all internal queues in a flattened form. """Returns a copy of all internal queues in a flattened form.
@ -3278,32 +3286,9 @@ class Server(BaseClientServer):
# prefix.queue.total # prefix.queue.total
# prefix.queue.running # prefix.queue.running
# prefix.queue.waiting # prefix.queue.waiting
# prefix.workers self.statsd.gauge('queue.total', self.total_jobs)
base_key = 'queue' self.statsd.gauge('queue.running', self.running_jobs)
total = 0 self.statsd.gauge('queue.waiting', self.waiting_jobs)
running = 0
waiting = 0
for job in self.jobs.values():
total += 1
if job.running:
running += 1
else:
waiting += 1
key = '.'.join([base_key, 'total'])
self.statsd.gauge(key, total)
key = '.'.join([base_key, 'running'])
self.statsd.gauge(key, running)
key = '.'.join([base_key, 'waiting'])
self.statsd.gauge(key, waiting)
workers = 0
for connection in self.active_connections:
if connection.functions:
workers += 1
self.statsd.gauge('workers', workers)
def _handleSubmitJob(self, packet, precedence, background=False): def _handleSubmitJob(self, packet, precedence, background=False):
name = packet.getArgument(0) name = packet.getArgument(0)
@ -3330,6 +3315,8 @@ class Server(BaseClientServer):
p = Packet(constants.RES, constants.JOB_CREATED, handle) p = Packet(constants.RES, constants.JOB_CREATED, handle)
packet.connection.sendPacket(p) packet.connection.sendPacket(p)
self.jobs[handle] = job self.jobs[handle] = job
self.total_jobs += 1
self.waiting_jobs += 1
if not background: if not background:
packet.connection.related_jobs[handle] = job packet.connection.related_jobs[handle] = job
if precedence == PRECEDENCE_HIGH: if precedence == PRECEDENCE_HIGH:
@ -3369,6 +3356,8 @@ class Server(BaseClientServer):
connection.related_jobs[job.handle] = job connection.related_jobs[job.handle] = job
job.worker_connection = connection job.worker_connection = connection
job.running = True job.running = True
self.waiting_jobs -= 1
self.running_jobs += 1
self._updateStats() self._updateStats()
return job return job
return None return None