From fd000b8c131b3b5d23d2c45233353107536565e1 Mon Sep 17 00:00:00 2001 From: zhang-shaoman Date: Sun, 5 May 2019 16:25:47 +0800 Subject: [PATCH] metering data lost randomly Thread lock has been added for flush(), while queue may be modified in other places, which has not been protected, so need to add lock in such places too. Change-Id: I6b46f07690f2fc165fe27bf509ee8926656fd6b9 Closes-Bug: #1827768 --- ceilometer/publisher/messaging.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/ceilometer/publisher/messaging.py b/ceilometer/publisher/messaging.py index d6a3ed2ac7..a4e9f8a31d 100644 --- a/ceilometer/publisher/messaging.py +++ b/ceilometer/publisher/messaging.py @@ -112,9 +112,11 @@ class MessagingPublisher(publisher.ConfigPublisherBase): for sample in samples ] topic = self.conf.publisher_notifier.metering_topic - self.local_queue.append((topic, meters)) + with self.queue_lock: + self.local_queue.append((topic, meters)) if self.per_meter_topic: + queue_per_meter_topic = [] for meter_name, meter_list in itertools.groupby( sorted(meters, key=operator.itemgetter('counter_name')), operator.itemgetter('counter_name')): @@ -122,7 +124,9 @@ class MessagingPublisher(publisher.ConfigPublisherBase): topic_name = topic + '.' + meter_name LOG.debug('Publishing %(m)d samples on %(n)s', {'m': len(meter_list), 'n': topic_name}) - self.local_queue.append((topic_name, meter_list)) + queue_per_meter_topic.append((topic_name, meter_list)) + with self.queue_lock: + self.local_queue.extend(queue_per_meter_topic) self.flush() @@ -180,7 +184,8 @@ class MessagingPublisher(publisher.ConfigPublisherBase): event, self.conf.publisher.telemetry_secret) for event in events] topic = self.conf.publisher_notifier.event_topic - self.local_queue.append((topic, ev_list)) + with self.queue_lock: + self.local_queue.append((topic, ev_list)) self.flush() @abc.abstractmethod