Add versioned payload decoder
This change decouples the RPC payload decoding into two subcategories based on the topic RabbitMQ topic. This is needed because the nova has two different notifiers and they differ in some aspects. Test plan: PASS: Test interface attach and detach notification listening with unversioned payload decoder. PASS: Test interface attach and detach notification listening with versioned payload decoder. Story: 2009299 Task: 44236 Depends-On: https://review.opendev.org/c/starlingx/utilities/+/823021 Signed-off-by: Iago Estrela <IagoFilipe.EstrelaBarros@windriver.com> Change-Id: I1a36652062a41a58d263d951e76c71b77bbb2744
This commit is contained in:
parent
c084efc3b8
commit
59aaea57f2
@ -75,40 +75,78 @@ def audits_initialize():
|
||||
return srv
|
||||
|
||||
|
||||
class InstanceOnlineNotificationEndpoint(object):
|
||||
class VersionedPayloadDecoder(object):
|
||||
def decode_instance_host(self, payload):
|
||||
instance_host = None
|
||||
nova_object_data = payload.get('nova_object.data', None)
|
||||
|
||||
if nova_object_data is not None:
|
||||
instance_host = nova_object_data.get('host', None)
|
||||
return instance_host
|
||||
|
||||
def decode_instance_uuid(self, payload):
|
||||
instance_uuid = None
|
||||
nova_object_data = payload.get('nova_object.data', None)
|
||||
|
||||
if nova_object_data is not None:
|
||||
instance_uuid = nova_object_data.get('uuid', None)
|
||||
|
||||
return instance_uuid
|
||||
|
||||
|
||||
class UnversionedPayloadDecoder(object):
|
||||
def decode_instance_host(self, payload):
|
||||
return payload.get('host', None)
|
||||
|
||||
def decode_instance_uuid(self, payload):
|
||||
return payload.get('instance_id', None)
|
||||
|
||||
|
||||
class BaseInstanceEndpoint(object):
|
||||
def __init__(self, payload_decoder):
|
||||
self.payload_decoder = payload_decoder
|
||||
|
||||
|
||||
class InstanceOnlineNotificationEndpoint(BaseInstanceEndpoint):
|
||||
filter_rule = oslo_messaging.NotificationFilter(
|
||||
event_type=pci_utils.get_event_type_regexp(pci_utils.ONLINE_EVENT_TYPES)
|
||||
)
|
||||
|
||||
def __init__(self, payload_decoder):
|
||||
super(InstanceOnlineNotificationEndpoint, self).__init__(payload_decoder)
|
||||
|
||||
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||
instance_host = payload.get('host', None)
|
||||
instance_host = self.payload_decoder.decode_instance_host(payload)
|
||||
current_host = os.getenv("COMPUTE_HOSTNAME", default=socket.gethostname())
|
||||
if instance_host != current_host:
|
||||
if instance_host is not None and instance_host != current_host:
|
||||
LOG.debug("Requeue notification: instance_host=%s != current_host=%s" % (
|
||||
instance_host, current_host))
|
||||
return oslo_messaging.NotificationResult.REQUEUE
|
||||
|
||||
instance_uuid = payload.get('instance_id', None)
|
||||
instance_uuid = self.payload_decoder.decode_instance_uuid(payload)
|
||||
if instance_uuid:
|
||||
LOG.info("Instance online: uuid=%s, instance_host=%s, event_type=%s" % (
|
||||
instance_uuid, instance_host, event_type))
|
||||
eventlet.spawn(get_inst, instance_uuid, query_instance_callback).wait()
|
||||
|
||||
|
||||
class InstanceOfflineNotificationEndpoint(object):
|
||||
class InstanceOfflineNotificationEndpoint(BaseInstanceEndpoint):
|
||||
filter_rule = oslo_messaging.NotificationFilter(
|
||||
event_type=pci_utils.get_event_type_regexp(pci_utils.OFFLINE_EVENT_TYPES)
|
||||
)
|
||||
|
||||
def __init__(self, payload_decoder):
|
||||
super(InstanceOfflineNotificationEndpoint, self).__init__(payload_decoder)
|
||||
|
||||
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||
instance_host = payload.get('host', None)
|
||||
instance_host = self.payload_decoder.decode_instance_host(payload)
|
||||
current_host = os.getenv("COMPUTE_HOSTNAME", default=socket.gethostname())
|
||||
if instance_host != current_host:
|
||||
if instance_host is not None and instance_host != current_host:
|
||||
LOG.debug("Requeue notification: instance_host=%s != current_host=%s" % (
|
||||
instance_host, current_host))
|
||||
return oslo_messaging.NotificationResult.REQUEUE
|
||||
|
||||
instance_uuid = payload.get('instance_id', None)
|
||||
instance_uuid = self.payload_decoder.decode_instance_uuid(payload)
|
||||
if instance_uuid:
|
||||
LOG.info("Instance offline: uuid=%s, instance_host=%s, event_type=%s" % (
|
||||
instance_uuid, instance_host, event_type))
|
||||
@ -125,14 +163,21 @@ def start_rabbitmq_client():
|
||||
cfg = CONF.amqp
|
||||
rabbit_url = "rabbit://%s:%s@%s:%s/%s" % (cfg['user_id'], cfg['password'],
|
||||
cfg['host'], cfg['port'], cfg['virt_host'])
|
||||
topic = cfg['topic']
|
||||
LOG.info(rabbit_url)
|
||||
|
||||
target = oslo_messaging.Target(exchange="nova", topic="notifications", server="info",
|
||||
target = oslo_messaging.Target(exchange="nova", topic=topic, server="info",
|
||||
version="2.1", fanout=True)
|
||||
transport = oslo_messaging.get_notification_transport(CONF, url=rabbit_url)
|
||||
|
||||
payload_decoder = UnversionedPayloadDecoder()
|
||||
|
||||
if topic == 'versioned_notifications':
|
||||
payload_decoder = VersionedPayloadDecoder()
|
||||
|
||||
endpoints = [
|
||||
InstanceOnlineNotificationEndpoint(),
|
||||
InstanceOfflineNotificationEndpoint(),
|
||||
InstanceOnlineNotificationEndpoint(payload_decoder),
|
||||
InstanceOfflineNotificationEndpoint(payload_decoder),
|
||||
]
|
||||
|
||||
server = oslo_messaging.get_notification_listener(transport, [target],
|
||||
|
Loading…
x
Reference in New Issue
Block a user