Merge "Use merger API for merger stats"
This commit is contained in:
commit
ac9b62e4b5
|
@ -1,42 +0,0 @@
|
|||
# Copyright 2018 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import gear
|
||||
import logging
|
||||
|
||||
log = logging.getLogger("zuul.gear_utils")
|
||||
|
||||
|
||||
def getGearmanFunctions(gearman):
|
||||
functions = {}
|
||||
for connection in gearman.active_connections:
|
||||
try:
|
||||
req = gear.StatusAdminRequest()
|
||||
connection.sendAdminRequest(req, timeout=300)
|
||||
except Exception:
|
||||
log.exception("Exception while listing functions")
|
||||
gearman._lostConnection(connection)
|
||||
continue
|
||||
for line in req.response.decode('utf8').split('\n'):
|
||||
parts = [x.strip() for x in line.split('\t')]
|
||||
if len(parts) < 4:
|
||||
continue
|
||||
# parts[0] - function name
|
||||
# parts[1] - total jobs queued (including building)
|
||||
# parts[2] - jobs building
|
||||
# parts[3] - workers registered
|
||||
data = functions.setdefault(parts[0], [0, 0, 0])
|
||||
for i in range(3):
|
||||
data[i] += int(parts[i + 1])
|
||||
return functions
|
|
@ -34,7 +34,6 @@ from zuul import rpclistener
|
|||
from zuul.lib import commandsocket
|
||||
from zuul.lib.ansible import AnsibleManager
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.gear_utils import getGearmanFunctions
|
||||
from zuul.lib.keystorage import KeyStorage
|
||||
from zuul.lib.logutil import get_annotated_logger
|
||||
from zuul.lib.queue import NamedQueue
|
||||
|
@ -397,10 +396,6 @@ class Scheduler(threading.Thread):
|
|||
if executor_component.accepting_work:
|
||||
executors_accepting += 1
|
||||
|
||||
for merger_component in self.component_registry.all("merger"):
|
||||
if merger_component.state == BaseComponent.RUNNING:
|
||||
mergers_online += 1
|
||||
|
||||
# Get all builds so we can filter by state and zone
|
||||
for build_request in self.executor.executor_api.inState():
|
||||
zone_stats = zoned_executor_stats.setdefault(
|
||||
|
@ -452,18 +447,19 @@ class Scheduler(threading.Thread):
|
|||
self.statsd.gauge('zuul.executors.online', executors_online)
|
||||
|
||||
# Publish the merger stats
|
||||
self.statsd.gauge('zuul.mergers.online', mergers_online)
|
||||
for merger_component in self.component_registry.all("merger"):
|
||||
if merger_component.state == BaseComponent.RUNNING:
|
||||
mergers_online += 1
|
||||
|
||||
functions = getGearmanFunctions(self.rpc.gearworker.gearman)
|
||||
functions.update(getGearmanFunctions(self.rpc_slow.gearworker.gearman))
|
||||
merge_queue = 0
|
||||
merge_running = 0
|
||||
for (name, (queued, running, registered)) in functions.items():
|
||||
if name == 'merger:merge':
|
||||
mergers_online = registered
|
||||
if name.startswith('merger:'):
|
||||
merge_queue += queued - running
|
||||
merge_running += running
|
||||
for merge_request in self.merger.merger_api.inState():
|
||||
if merge_request.state == merge_request.REQUESTED:
|
||||
merge_queue += 1
|
||||
if merge_request.state == merge_request.RUNNING:
|
||||
merge_running += 1
|
||||
|
||||
self.statsd.gauge('zuul.mergers.online', mergers_online)
|
||||
self.statsd.gauge('zuul.mergers.jobs_running', merge_running)
|
||||
self.statsd.gauge('zuul.mergers.jobs_queued', merge_queue)
|
||||
self.statsd.gauge('zuul.scheduler.eventqueues.management',
|
||||
|
|
Loading…
Reference in New Issue