diff --git a/utilities/pci-irq-affinity-agent/pci_irq_affinity/pci_irq_affinity/agent.py b/utilities/pci-irq-affinity-agent/pci_irq_affinity/pci_irq_affinity/agent.py index 5df10ca3..7d19898e 100644 --- a/utilities/pci-irq-affinity-agent/pci_irq_affinity/pci_irq_affinity/agent.py +++ b/utilities/pci-irq-affinity-agent/pci_irq_affinity/pci_irq_affinity/agent.py @@ -75,40 +75,78 @@ def audits_initialize(): return srv -class InstanceOnlineNotificationEndpoint(object): +class VersionedPayloadDecoder(object): + def decode_instance_host(self, payload): + instance_host = None + nova_object_data = payload.get('nova_object.data', None) + + if nova_object_data is not None: + instance_host = nova_object_data.get('host', None) + return instance_host + + def decode_instance_uuid(self, payload): + instance_uuid = None + nova_object_data = payload.get('nova_object.data', None) + + if nova_object_data is not None: + instance_uuid = nova_object_data.get('uuid', None) + + return instance_uuid + + +class UnversionedPayloadDecoder(object): + def decode_instance_host(self, payload): + return payload.get('host', None) + + def decode_instance_uuid(self, payload): + return payload.get('instance_id', None) + + +class BaseInstanceEndpoint(object): + def __init__(self, payload_decoder): + self.payload_decoder = payload_decoder + + +class InstanceOnlineNotificationEndpoint(BaseInstanceEndpoint): filter_rule = oslo_messaging.NotificationFilter( event_type=pci_utils.get_event_type_regexp(pci_utils.ONLINE_EVENT_TYPES) ) + def __init__(self, payload_decoder): + super(InstanceOnlineNotificationEndpoint, self).__init__(payload_decoder) + def info(self, ctxt, publisher_id, event_type, payload, metadata): - instance_host = payload.get('host', None) + instance_host = self.payload_decoder.decode_instance_host(payload) current_host = os.getenv("COMPUTE_HOSTNAME", default=socket.gethostname()) - if instance_host != current_host: + if instance_host is not None and instance_host != current_host: LOG.debug("Requeue notification: instance_host=%s != current_host=%s" % ( instance_host, current_host)) return oslo_messaging.NotificationResult.REQUEUE - instance_uuid = payload.get('instance_id', None) + instance_uuid = self.payload_decoder.decode_instance_uuid(payload) if instance_uuid: LOG.info("Instance online: uuid=%s, instance_host=%s, event_type=%s" % ( instance_uuid, instance_host, event_type)) eventlet.spawn(get_inst, instance_uuid, query_instance_callback).wait() -class InstanceOfflineNotificationEndpoint(object): +class InstanceOfflineNotificationEndpoint(BaseInstanceEndpoint): filter_rule = oslo_messaging.NotificationFilter( event_type=pci_utils.get_event_type_regexp(pci_utils.OFFLINE_EVENT_TYPES) ) + def __init__(self, payload_decoder): + super(InstanceOfflineNotificationEndpoint, self).__init__(payload_decoder) + def info(self, ctxt, publisher_id, event_type, payload, metadata): - instance_host = payload.get('host', None) + instance_host = self.payload_decoder.decode_instance_host(payload) current_host = os.getenv("COMPUTE_HOSTNAME", default=socket.gethostname()) - if instance_host != current_host: + if instance_host is not None and instance_host != current_host: LOG.debug("Requeue notification: instance_host=%s != current_host=%s" % ( instance_host, current_host)) return oslo_messaging.NotificationResult.REQUEUE - instance_uuid = payload.get('instance_id', None) + instance_uuid = self.payload_decoder.decode_instance_uuid(payload) if instance_uuid: LOG.info("Instance offline: uuid=%s, instance_host=%s, event_type=%s" % ( instance_uuid, instance_host, event_type)) @@ -125,14 +163,21 @@ def start_rabbitmq_client(): cfg = CONF.amqp rabbit_url = "rabbit://%s:%s@%s:%s/%s" % (cfg['user_id'], cfg['password'], cfg['host'], cfg['port'], cfg['virt_host']) + topic = cfg['topic'] LOG.info(rabbit_url) - target = oslo_messaging.Target(exchange="nova", topic="notifications", server="info", + target = oslo_messaging.Target(exchange="nova", topic=topic, server="info", version="2.1", fanout=True) transport = oslo_messaging.get_notification_transport(CONF, url=rabbit_url) + + payload_decoder = UnversionedPayloadDecoder() + + if topic == 'versioned_notifications': + payload_decoder = VersionedPayloadDecoder() + endpoints = [ - InstanceOnlineNotificationEndpoint(), - InstanceOfflineNotificationEndpoint(), + InstanceOnlineNotificationEndpoint(payload_decoder), + InstanceOfflineNotificationEndpoint(payload_decoder), ] server = oslo_messaging.get_notification_listener(transport, [target],