diff --git a/kuryr_kubernetes/cni/daemon/service.py b/kuryr_kubernetes/cni/daemon/service.py index 0d510402a..3b9f19f8b 100644 --- a/kuryr_kubernetes/cni/daemon/service.py +++ b/kuryr_kubernetes/cni/daemon/service.py @@ -17,6 +17,7 @@ import errno from http import client as httplib import multiprocessing import os +import queue import socket import sys import threading @@ -90,18 +91,8 @@ class DaemonServer(object): def _update_metrics(self, command, error, duration): """Add a new metric value to the shared metrics dict""" - params = {} - try: - params = self._prepare_request() - except Exception: - LOG.exception('Exception when reading CNI params.') - return - namespace = params.args.K8S_POD_NAMESPACE - name = params.args.K8S_POD_NAME - name = f'export-{namespace}/{name}' labels = {'command': command, 'error': error} - with lockutils.lock(name): - self.metrics[name] = {'labels': labels, 'duration': duration} + self.metrics.put({'labels': labels, 'duration': duration}) @cni_utils.measure_time('ADD') def add(self): @@ -374,13 +365,13 @@ class CNIDaemonExporterService(cotyledon.Service): def _start_metric_updater(self): while self.is_running: - if self.metrics: - pod_name = list(self.metrics.keys())[0] - with lockutils.lock(pod_name): - labels = self.metrics[pod_name]['labels'] - duration = self.metrics[pod_name]['duration'] - self.prometheus_exporter.update_metric(labels, duration) - del self.metrics[pod_name] + try: + metric = self.metrics.get(timeout=1) + except queue.Empty: + continue + labels = metric['labels'] + duration = metric['duration'] + self.prometheus_exporter.update_metric(labels, duration) def terminate(self): self.is_running = False @@ -410,7 +401,7 @@ class CNIDaemonServiceManager(cotyledon.ServiceManager): self.manager = multiprocessing.Manager() registry = self.manager.dict() # For Watcher->Server communication. healthy = multiprocessing.Value(c_bool, True) - metrics = self.manager.dict() + metrics = self.manager.Queue() self.add(CNIDaemonWatcherService, workers=1, args=(registry, healthy,)) self.add(CNIDaemonServerService, workers=1, args=(registry, healthy, metrics,)) diff --git a/kuryr_kubernetes/tests/unit/cni/test_service.py b/kuryr_kubernetes/tests/unit/cni/test_service.py index 4add4f54e..f438fd085 100644 --- a/kuryr_kubernetes/tests/unit/cni/test_service.py +++ b/kuryr_kubernetes/tests/unit/cni/test_service.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import queue from unittest import mock from oslo_serialization import jsonutils @@ -31,7 +32,7 @@ class TestDaemonServer(base.TestCase): self.k8s_mock = self.useFixture(kuryr_fixtures.MockK8sClient()) self.plugin = k8s_cni_registry.K8sCNIRegistryPlugin({}, healthy) self.health_registry = mock.Mock() - self.metrics = dict() + self.metrics = queue.Queue() self.srv = service.DaemonServer( self.plugin, self.health_registry, self.metrics)