Decrease CPU usage of Prometheus exporter
Seems like a thread moving the metrics data into the service exposing them in the Prometheus format was not constrained with any sleep. This was causing increased CPU usage of kuryr-cni pods without a real reason to do it. This commit solves that by rewriting the thread to use the common multiprocessing queue pattern. Change-Id: I0eacc37022fbf214c361dbc52b42281ffa5301fd
This commit is contained in:
parent
410793a6a0
commit
fcc71cee92
@ -17,6 +17,7 @@ import errno
|
||||
from http import client as httplib
|
||||
import multiprocessing
|
||||
import os
|
||||
import queue
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
@ -90,18 +91,8 @@ class DaemonServer(object):
|
||||
|
||||
def _update_metrics(self, command, error, duration):
|
||||
"""Add a new metric value to the shared metrics dict"""
|
||||
params = {}
|
||||
try:
|
||||
params = self._prepare_request()
|
||||
except Exception:
|
||||
LOG.exception('Exception when reading CNI params.')
|
||||
return
|
||||
namespace = params.args.K8S_POD_NAMESPACE
|
||||
name = params.args.K8S_POD_NAME
|
||||
name = f'export-{namespace}/{name}'
|
||||
labels = {'command': command, 'error': error}
|
||||
with lockutils.lock(name):
|
||||
self.metrics[name] = {'labels': labels, 'duration': duration}
|
||||
self.metrics.put({'labels': labels, 'duration': duration})
|
||||
|
||||
@cni_utils.measure_time('ADD')
|
||||
def add(self):
|
||||
@ -374,13 +365,13 @@ class CNIDaemonExporterService(cotyledon.Service):
|
||||
|
||||
def _start_metric_updater(self):
|
||||
while self.is_running:
|
||||
if self.metrics:
|
||||
pod_name = list(self.metrics.keys())[0]
|
||||
with lockutils.lock(pod_name):
|
||||
labels = self.metrics[pod_name]['labels']
|
||||
duration = self.metrics[pod_name]['duration']
|
||||
self.prometheus_exporter.update_metric(labels, duration)
|
||||
del self.metrics[pod_name]
|
||||
try:
|
||||
metric = self.metrics.get(timeout=1)
|
||||
except queue.Empty:
|
||||
continue
|
||||
labels = metric['labels']
|
||||
duration = metric['duration']
|
||||
self.prometheus_exporter.update_metric(labels, duration)
|
||||
|
||||
def terminate(self):
|
||||
self.is_running = False
|
||||
@ -410,7 +401,7 @@ class CNIDaemonServiceManager(cotyledon.ServiceManager):
|
||||
self.manager = multiprocessing.Manager()
|
||||
registry = self.manager.dict() # For Watcher->Server communication.
|
||||
healthy = multiprocessing.Value(c_bool, True)
|
||||
metrics = self.manager.dict()
|
||||
metrics = self.manager.Queue()
|
||||
self.add(CNIDaemonWatcherService, workers=1, args=(registry, healthy,))
|
||||
self.add(CNIDaemonServerService, workers=1, args=(registry, healthy,
|
||||
metrics,))
|
||||
|
@ -12,6 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import queue
|
||||
from unittest import mock
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
@ -31,7 +32,7 @@ class TestDaemonServer(base.TestCase):
|
||||
self.k8s_mock = self.useFixture(kuryr_fixtures.MockK8sClient())
|
||||
self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin({}, healthy)
|
||||
self.health_registry = mock.Mock()
|
||||
self.metrics = dict()
|
||||
self.metrics = queue.Queue()
|
||||
self.srv = service.DaemonServer(
|
||||
self.plugin, self.health_registry, self.metrics)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user