Merge "Add versioned payload decoder"
This commit is contained in:
commit
ed01aa74e1
@ -75,40 +75,78 @@ def audits_initialize():
|
|||||||
return srv
|
return srv
|
||||||
|
|
||||||
|
|
||||||
class InstanceOnlineNotificationEndpoint(object):
|
class VersionedPayloadDecoder(object):
|
||||||
|
def decode_instance_host(self, payload):
|
||||||
|
instance_host = None
|
||||||
|
nova_object_data = payload.get('nova_object.data', None)
|
||||||
|
|
||||||
|
if nova_object_data is not None:
|
||||||
|
instance_host = nova_object_data.get('host', None)
|
||||||
|
return instance_host
|
||||||
|
|
||||||
|
def decode_instance_uuid(self, payload):
|
||||||
|
instance_uuid = None
|
||||||
|
nova_object_data = payload.get('nova_object.data', None)
|
||||||
|
|
||||||
|
if nova_object_data is not None:
|
||||||
|
instance_uuid = nova_object_data.get('uuid', None)
|
||||||
|
|
||||||
|
return instance_uuid
|
||||||
|
|
||||||
|
|
||||||
|
class UnversionedPayloadDecoder(object):
|
||||||
|
def decode_instance_host(self, payload):
|
||||||
|
return payload.get('host', None)
|
||||||
|
|
||||||
|
def decode_instance_uuid(self, payload):
|
||||||
|
return payload.get('instance_id', None)
|
||||||
|
|
||||||
|
|
||||||
|
class BaseInstanceEndpoint(object):
|
||||||
|
def __init__(self, payload_decoder):
|
||||||
|
self.payload_decoder = payload_decoder
|
||||||
|
|
||||||
|
|
||||||
|
class InstanceOnlineNotificationEndpoint(BaseInstanceEndpoint):
|
||||||
filter_rule = oslo_messaging.NotificationFilter(
|
filter_rule = oslo_messaging.NotificationFilter(
|
||||||
event_type=pci_utils.get_event_type_regexp(pci_utils.ONLINE_EVENT_TYPES)
|
event_type=pci_utils.get_event_type_regexp(pci_utils.ONLINE_EVENT_TYPES)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def __init__(self, payload_decoder):
|
||||||
|
super(InstanceOnlineNotificationEndpoint, self).__init__(payload_decoder)
|
||||||
|
|
||||||
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||||
instance_host = payload.get('host', None)
|
instance_host = self.payload_decoder.decode_instance_host(payload)
|
||||||
current_host = os.getenv("COMPUTE_HOSTNAME", default=socket.gethostname())
|
current_host = os.getenv("COMPUTE_HOSTNAME", default=socket.gethostname())
|
||||||
if instance_host != current_host:
|
if instance_host is not None and instance_host != current_host:
|
||||||
LOG.debug("Requeue notification: instance_host=%s != current_host=%s" % (
|
LOG.debug("Requeue notification: instance_host=%s != current_host=%s" % (
|
||||||
instance_host, current_host))
|
instance_host, current_host))
|
||||||
return oslo_messaging.NotificationResult.REQUEUE
|
return oslo_messaging.NotificationResult.REQUEUE
|
||||||
|
|
||||||
instance_uuid = payload.get('instance_id', None)
|
instance_uuid = self.payload_decoder.decode_instance_uuid(payload)
|
||||||
if instance_uuid:
|
if instance_uuid:
|
||||||
LOG.info("Instance online: uuid=%s, instance_host=%s, event_type=%s" % (
|
LOG.info("Instance online: uuid=%s, instance_host=%s, event_type=%s" % (
|
||||||
instance_uuid, instance_host, event_type))
|
instance_uuid, instance_host, event_type))
|
||||||
eventlet.spawn(get_inst, instance_uuid, query_instance_callback).wait()
|
eventlet.spawn(get_inst, instance_uuid, query_instance_callback).wait()
|
||||||
|
|
||||||
|
|
||||||
class InstanceOfflineNotificationEndpoint(object):
|
class InstanceOfflineNotificationEndpoint(BaseInstanceEndpoint):
|
||||||
filter_rule = oslo_messaging.NotificationFilter(
|
filter_rule = oslo_messaging.NotificationFilter(
|
||||||
event_type=pci_utils.get_event_type_regexp(pci_utils.OFFLINE_EVENT_TYPES)
|
event_type=pci_utils.get_event_type_regexp(pci_utils.OFFLINE_EVENT_TYPES)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def __init__(self, payload_decoder):
|
||||||
|
super(InstanceOfflineNotificationEndpoint, self).__init__(payload_decoder)
|
||||||
|
|
||||||
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
def info(self, ctxt, publisher_id, event_type, payload, metadata):
|
||||||
instance_host = payload.get('host', None)
|
instance_host = self.payload_decoder.decode_instance_host(payload)
|
||||||
current_host = os.getenv("COMPUTE_HOSTNAME", default=socket.gethostname())
|
current_host = os.getenv("COMPUTE_HOSTNAME", default=socket.gethostname())
|
||||||
if instance_host != current_host:
|
if instance_host is not None and instance_host != current_host:
|
||||||
LOG.debug("Requeue notification: instance_host=%s != current_host=%s" % (
|
LOG.debug("Requeue notification: instance_host=%s != current_host=%s" % (
|
||||||
instance_host, current_host))
|
instance_host, current_host))
|
||||||
return oslo_messaging.NotificationResult.REQUEUE
|
return oslo_messaging.NotificationResult.REQUEUE
|
||||||
|
|
||||||
instance_uuid = payload.get('instance_id', None)
|
instance_uuid = self.payload_decoder.decode_instance_uuid(payload)
|
||||||
if instance_uuid:
|
if instance_uuid:
|
||||||
LOG.info("Instance offline: uuid=%s, instance_host=%s, event_type=%s" % (
|
LOG.info("Instance offline: uuid=%s, instance_host=%s, event_type=%s" % (
|
||||||
instance_uuid, instance_host, event_type))
|
instance_uuid, instance_host, event_type))
|
||||||
@ -125,14 +163,21 @@ def start_rabbitmq_client():
|
|||||||
cfg = CONF.amqp
|
cfg = CONF.amqp
|
||||||
rabbit_url = "rabbit://%s:%s@%s:%s/%s" % (cfg['user_id'], cfg['password'],
|
rabbit_url = "rabbit://%s:%s@%s:%s/%s" % (cfg['user_id'], cfg['password'],
|
||||||
cfg['host'], cfg['port'], cfg['virt_host'])
|
cfg['host'], cfg['port'], cfg['virt_host'])
|
||||||
|
topic = cfg['topic']
|
||||||
LOG.info(rabbit_url)
|
LOG.info(rabbit_url)
|
||||||
|
|
||||||
target = oslo_messaging.Target(exchange="nova", topic="notifications", server="info",
|
target = oslo_messaging.Target(exchange="nova", topic=topic, server="info",
|
||||||
version="2.1", fanout=True)
|
version="2.1", fanout=True)
|
||||||
transport = oslo_messaging.get_notification_transport(CONF, url=rabbit_url)
|
transport = oslo_messaging.get_notification_transport(CONF, url=rabbit_url)
|
||||||
|
|
||||||
|
payload_decoder = UnversionedPayloadDecoder()
|
||||||
|
|
||||||
|
if topic == 'versioned_notifications':
|
||||||
|
payload_decoder = VersionedPayloadDecoder()
|
||||||
|
|
||||||
endpoints = [
|
endpoints = [
|
||||||
InstanceOnlineNotificationEndpoint(),
|
InstanceOnlineNotificationEndpoint(payload_decoder),
|
||||||
InstanceOfflineNotificationEndpoint(),
|
InstanceOfflineNotificationEndpoint(payload_decoder),
|
||||||
]
|
]
|
||||||
|
|
||||||
server = oslo_messaging.get_notification_listener(transport, [target],
|
server = oslo_messaging.get_notification_listener(transport, [target],
|
||||||
|
Loading…
x
Reference in New Issue
Block a user