From 59aaea57f24d4fbce62e072ecc27a4e78e100767 Mon Sep 17 00:00:00 2001 From: Iago Estrela <IagoFilipe.EstrelaBarros@windriver.com> Date: Mon, 27 Dec 2021 14:29:06 -0300 Subject: [PATCH] Add versioned payload decoder This change decouples the RPC payload decoding into two subcategories based on the topic RabbitMQ topic. This is needed because the nova has two different notifiers and they differ in some aspects. Test plan: PASS: Test interface attach and detach notification listening with unversioned payload decoder. PASS: Test interface attach and detach notification listening with versioned payload decoder. Story: 2009299 Task: 44236 Depends-On: https://review.opendev.org/c/starlingx/utilities/+/823021 Signed-off-by: Iago Estrela <IagoFilipe.EstrelaBarros@windriver.com> Change-Id: I1a36652062a41a58d263d951e76c71b77bbb2744 --- .../pci_irq_affinity/agent.py | 67 ++++++++++++++++--- 1 file changed, 56 insertions(+), 11 deletions(-) diff --git a/utilities/pci-irq-affinity-agent/pci_irq_affinity/pci_irq_affinity/agent.py b/utilities/pci-irq-affinity-agent/pci_irq_affinity/pci_irq_affinity/agent.py index 5df10ca3..7d19898e 100644 --- a/utilities/pci-irq-affinity-agent/pci_irq_affinity/pci_irq_affinity/agent.py +++ b/utilities/pci-irq-affinity-agent/pci_irq_affinity/pci_irq_affinity/agent.py @@ -75,40 +75,78 @@ def audits_initialize(): return srv -class InstanceOnlineNotificationEndpoint(object): +class VersionedPayloadDecoder(object): + def decode_instance_host(self, payload): + instance_host = None + nova_object_data = payload.get('nova_object.data', None) + + if nova_object_data is not None: + instance_host = nova_object_data.get('host', None) + return instance_host + + def decode_instance_uuid(self, payload): + instance_uuid = None + nova_object_data = payload.get('nova_object.data', None) + + if nova_object_data is not None: + instance_uuid = nova_object_data.get('uuid', None) + + return instance_uuid + + +class UnversionedPayloadDecoder(object): + def decode_instance_host(self, payload): + return payload.get('host', None) + + def decode_instance_uuid(self, payload): + return payload.get('instance_id', None) + + +class BaseInstanceEndpoint(object): + def __init__(self, payload_decoder): + self.payload_decoder = payload_decoder + + +class InstanceOnlineNotificationEndpoint(BaseInstanceEndpoint): filter_rule = oslo_messaging.NotificationFilter( event_type=pci_utils.get_event_type_regexp(pci_utils.ONLINE_EVENT_TYPES) ) + def __init__(self, payload_decoder): + super(InstanceOnlineNotificationEndpoint, self).__init__(payload_decoder) + def info(self, ctxt, publisher_id, event_type, payload, metadata): - instance_host = payload.get('host', None) + instance_host = self.payload_decoder.decode_instance_host(payload) current_host = os.getenv("COMPUTE_HOSTNAME", default=socket.gethostname()) - if instance_host != current_host: + if instance_host is not None and instance_host != current_host: LOG.debug("Requeue notification: instance_host=%s != current_host=%s" % ( instance_host, current_host)) return oslo_messaging.NotificationResult.REQUEUE - instance_uuid = payload.get('instance_id', None) + instance_uuid = self.payload_decoder.decode_instance_uuid(payload) if instance_uuid: LOG.info("Instance online: uuid=%s, instance_host=%s, event_type=%s" % ( instance_uuid, instance_host, event_type)) eventlet.spawn(get_inst, instance_uuid, query_instance_callback).wait() -class InstanceOfflineNotificationEndpoint(object): +class InstanceOfflineNotificationEndpoint(BaseInstanceEndpoint): filter_rule = oslo_messaging.NotificationFilter( event_type=pci_utils.get_event_type_regexp(pci_utils.OFFLINE_EVENT_TYPES) ) + def __init__(self, payload_decoder): + super(InstanceOfflineNotificationEndpoint, self).__init__(payload_decoder) + def info(self, ctxt, publisher_id, event_type, payload, metadata): - instance_host = payload.get('host', None) + instance_host = self.payload_decoder.decode_instance_host(payload) current_host = os.getenv("COMPUTE_HOSTNAME", default=socket.gethostname()) - if instance_host != current_host: + if instance_host is not None and instance_host != current_host: LOG.debug("Requeue notification: instance_host=%s != current_host=%s" % ( instance_host, current_host)) return oslo_messaging.NotificationResult.REQUEUE - instance_uuid = payload.get('instance_id', None) + instance_uuid = self.payload_decoder.decode_instance_uuid(payload) if instance_uuid: LOG.info("Instance offline: uuid=%s, instance_host=%s, event_type=%s" % ( instance_uuid, instance_host, event_type)) @@ -125,14 +163,21 @@ def start_rabbitmq_client(): cfg = CONF.amqp rabbit_url = "rabbit://%s:%s@%s:%s/%s" % (cfg['user_id'], cfg['password'], cfg['host'], cfg['port'], cfg['virt_host']) + topic = cfg['topic'] LOG.info(rabbit_url) - target = oslo_messaging.Target(exchange="nova", topic="notifications", server="info", + target = oslo_messaging.Target(exchange="nova", topic=topic, server="info", version="2.1", fanout=True) transport = oslo_messaging.get_notification_transport(CONF, url=rabbit_url) + + payload_decoder = UnversionedPayloadDecoder() + + if topic == 'versioned_notifications': + payload_decoder = VersionedPayloadDecoder() + endpoints = [ - InstanceOnlineNotificationEndpoint(), - InstanceOfflineNotificationEndpoint(), + InstanceOnlineNotificationEndpoint(payload_decoder), + InstanceOfflineNotificationEndpoint(payload_decoder), ] server = oslo_messaging.get_notification_listener(transport, [target],