Refactor notifications

Make a registry of different types of processed events
instead of an endless if/elif clause.

Change-Id: I34ebdca82810b9abd46a84aca7f1a8febf718be6
This commit is contained in:
Grzegorz Grasza 2018-11-22 09:57:35 +01:00
parent fe72231faa
commit 7fa5789e51
1 changed files with 143 additions and 110 deletions

View File

@ -43,6 +43,10 @@ LOG = logging.getLogger(__name__)
BACKOFF = 2
def ipaclient():
return IPAClient(backoff=BACKOFF)
def novaclient():
session = get_session()
return nova_client.Client('2.1', session=session)
@ -53,6 +57,14 @@ def neutronclient():
return neutron_client.Client(session=session)
class Registry(dict):
def __call__(self, name):
def decorator(fun):
self[name] = fun
return fun
return decorator
class NotificationEndpoint(object):
filter_rule = oslo_messaging.notify.filter.NotificationFilter(
@ -63,15 +75,7 @@ class NotificationEndpoint(object):
'^network.floating_ip.(dis)?associate|'
'^floatingip.update.end')
def _generate_hostname(self, hostname):
# FIXME: Don't re-calculate the hostname, fetch it from somewhere
project = 'foo'
domain = get_domain()
if CONF.project_subdomain:
host = '%s.%s.%s' % (hostname, project, domain)
else:
host = '%s.%s' % (hostname, domain)
return host
event_handlers = Registry()
def info(self, ctxt, publisher_id, event_type, payload, metadata):
LOG.debug('notification:')
@ -80,105 +84,124 @@ class NotificationEndpoint(object):
LOG.debug("publisher: %s, event: %s, metadata: %s", publisher_id,
event_type, metadata)
ipaclient = IPAClient(backoff=BACKOFF)
if event_type == 'compute.instance.create.end':
hostname = self._generate_hostname(payload.get('hostname'))
instance_id = payload.get('instance_id')
LOG.info("Add new host %s (%s)", instance_id, hostname)
elif event_type == 'compute.instance.update':
join_controller = join.JoinController(ipaclient)
hostname_short = payload.get('hostname')
instance_id = payload.get('instance_id')
payload_metadata = payload.get('metadata')
image_metadata = payload.get('image_meta')
event_handler = self.event_handlers.get(
event_type, lambda payload: LOG.error("Status update or unknown"))
# run event handler for received notification type
event_handler(self, payload)
hostname = self._generate_hostname(hostname_short)
@event_handlers('compute.instance.create.end')
def instance_create(self, payload):
hostname = self._generate_hostname(payload.get('hostname'))
instance_id = payload.get('instance_id')
LOG.info("Add new host %s (%s)", instance_id, hostname)
enroll = payload_metadata.get('ipa_enroll', '')
image_enroll = image_metadata.get('ipa_enroll', '')
if enroll.lower() != 'true' and image_enroll.lower() != 'true':
LOG.info('IPA enrollment not requested, skipping update of %s',
hostname)
return
# Ensure this instance exists in nova
instance = get_instance(instance_id)
if instance is None:
msg = 'No such instance-id, %s' % instance_id
LOG.error(msg)
return
@event_handlers('compute.instance.update')
def instance_update(self, payload):
ipa = ipaclient()
join_controller = join.JoinController(ipa)
hostname_short = payload.get('hostname')
instance_id = payload.get('instance_id')
payload_metadata = payload.get('metadata')
image_metadata = payload.get('image_meta')
ipaclient.start_batch_operation()
# key-per-service
managed_services = [
payload_metadata[key] for key in payload_metadata.keys()
if key.startswith('managed_service_')]
if managed_services:
join_controller.handle_services(hostname, managed_services)
# compact json format
if 'compact_services' in payload_metadata:
join_controller.handle_compact_services(
hostname_short, payload_metadata.get('compact_services'))
ipaclient.flush_batch_operation()
elif event_type == 'compute.instance.delete.end':
hostname_short = payload.get('hostname')
instance_id = payload.get('instance_id')
payload_metadata = payload.get('metadata')
image_metadata = payload.get('image_meta')
hostname = self._generate_hostname(hostname_short)
hostname = self._generate_hostname(hostname_short)
enroll = payload_metadata.get('ipa_enroll', '')
image_enroll = image_metadata.get('ipa_enroll', '')
if enroll.lower() != 'true' and image_enroll.lower() != 'true':
LOG.info('IPA enrollment not requested, skipping update of %s',
hostname)
return
# Ensure this instance exists in nova
instance = get_instance(instance_id)
if instance is None:
msg = 'No such instance-id, %s' % instance_id
LOG.error(msg)
return
enroll = payload_metadata.get('ipa_enroll', '')
image_enroll = image_metadata.get('ipa_enroll', '')
ipa.start_batch_operation()
# key-per-service
managed_services = [
payload_metadata[key] for key in payload_metadata.keys()
if key.startswith('managed_service_')]
if managed_services:
join_controller.handle_services(hostname, managed_services)
# compact json format
if 'compact_services' in payload_metadata:
join_controller.handle_compact_services(
hostname_short, payload_metadata.get('compact_services'))
ipa.flush_batch_operation()
if enroll.lower() != 'true' and image_enroll.lower() != 'true':
LOG.info('IPA enrollment not requested, skipping delete of %s',
hostname)
return
@event_handlers('compute.instance.delete.end')
def instance_delete(self, payload):
hostname_short = payload.get('hostname')
instance_id = payload.get('instance_id')
payload_metadata = payload.get('metadata')
image_metadata = payload.get('image_meta')
LOG.info("Delete host %s (%s)", instance_id, hostname)
ipaclient.delete_host(hostname, {})
self.delete_subhosts(ipaclient, hostname_short, payload_metadata)
elif event_type == 'network.floating_ip.associate':
floating_ip = payload.get('floating_ip')
LOG.info("Associate floating IP %s" % floating_ip)
nova = novaclient()
server = nova.servers.get(payload.get('instance_id'))
if server:
ipaclient.add_ip(server.get, floating_ip)
else:
LOG.error("Could not resolve %s into a hostname",
payload.get('instance_id'))
elif event_type == 'network.floating_ip.disassociate':
floating_ip = payload.get('floating_ip')
LOG.info("Disassociate floating IP %s" % floating_ip)
nova = novaclient()
server = nova.servers.get(payload.get('instance_id'))
if server:
ipaclient.remove_ip(server.name, floating_ip)
else:
LOG.error("Could not resolve %s into a hostname",
payload.get('instance_id'))
elif event_type == 'floatingip.update.end': # Neutron
floatingip = payload.get('floatingip')
floating_ip = floatingip.get('floating_ip_address')
port_id = floatingip.get('port_id')
LOG.info("Neutron floating IP associate: %s" % floating_ip)
nova = novaclient()
neutron = neutronclient()
search_opts = {'id': port_id}
ports = neutron.list_ports(**search_opts).get('ports')
if len(ports) == 1:
device_id = ports[0].get('device_id')
if device_id:
server = nova.servers.get(device_id)
if server:
ipaclient.add_ip(server.name, floating_ip)
else:
LOG.error("Expected 1 port, got %d", len(ports))
hostname = self._generate_hostname(hostname_short)
enroll = payload_metadata.get('ipa_enroll', '')
image_enroll = image_metadata.get('ipa_enroll', '')
if enroll.lower() != 'true' and image_enroll.lower() != 'true':
LOG.info('IPA enrollment not requested, skipping delete of %s',
hostname)
return
LOG.info("Delete host %s (%s)", instance_id, hostname)
ipa = ipaclient()
ipa.delete_host(hostname, {})
self.delete_subhosts(ipa, hostname_short, payload_metadata)
@event_handlers('network.floating_ip.associate')
def floaitng_ip_associate(self, payload):
floating_ip = payload.get('floating_ip')
LOG.info("Associate floating IP %s" % floating_ip)
ipa = ipaclient()
nova = novaclient()
server = nova.servers.get(payload.get('instance_id'))
if server:
ipa.add_ip(server.get, floating_ip)
else:
LOG.error("Status update or unknown")
LOG.error("Could not resolve %s into a hostname",
payload.get('instance_id'))
def delete_subhosts(self, ipaclient, hostname_short, metadata):
@event_handlers('network.floating_ip.disassociate')
def floating_ip_disassociate(self, payload):
floating_ip = payload.get('floating_ip')
LOG.info("Disassociate floating IP %s" % floating_ip)
ipa = ipaclient()
nova = novaclient()
server = nova.servers.get(payload.get('instance_id'))
if server:
ipa.remove_ip(server.name, floating_ip)
else:
LOG.error("Could not resolve %s into a hostname",
payload.get('instance_id'))
@event_handlers('floatingip.update.end')
def floating_ip_update(self, payload):
"""Neutron event"""
floatingip = payload.get('floatingip')
floating_ip = floatingip.get('floating_ip_address')
port_id = floatingip.get('port_id')
LOG.info("Neutron floating IP associate: %s" % floating_ip)
ipa = ipaclient()
nova = novaclient()
neutron = neutronclient()
search_opts = {'id': port_id}
ports = neutron.list_ports(**search_opts).get('ports')
if len(ports) == 1:
device_id = ports[0].get('device_id')
if device_id:
server = nova.servers.get(device_id)
if server:
ipa.add_ip(server.name, floating_ip)
else:
LOG.error("Expected 1 port, got %d", len(ports))
def delete_subhosts(self, ipa, hostname_short, metadata):
"""Delete subhosts and remove VIPs if possible.
Servers can have multiple network interfaces, and therefore can
@ -199,14 +222,14 @@ class NotificationEndpoint(object):
return
if 'compact_services' in metadata:
self.handle_compact_services(ipaclient, hostname_short,
self.handle_compact_services(ipa, hostname_short,
metadata.get('compact_services'))
managed_services = [metadata[key] for key in metadata.keys()
if key.startswith('managed_service_')]
if managed_services:
self.handle_managed_services(ipaclient, managed_services)
self.handle_managed_services(ipa, managed_services)
def handle_compact_services(self, ipaclient, host_short,
def handle_compact_services(self, ipa, host_short,
service_repr_json):
"""Reconstructs and removes subhosts for compact services.
@ -225,7 +248,7 @@ class NotificationEndpoint(object):
service_repr = json.loads(service_repr_json)
hosts_found = list()
ipaclient.start_batch_operation()
ipa.start_batch_operation()
for service_name, net_list in service_repr.items():
for network in net_list:
host = "%s.%s" % (host_short, network)
@ -233,11 +256,11 @@ class NotificationEndpoint(object):
# remove host
if principal_host not in hosts_found:
ipaclient.delete_subhost(principal_host)
ipa.delete_subhost(principal_host)
hosts_found.append(principal_host)
ipaclient.flush_batch_operation()
ipa.flush_batch_operation()
def handle_managed_services(self, ipaclient, services):
def handle_managed_services(self, ipa, services):
"""Delete any managed services if possible.
Checks to see if the managed service subhost has no managed hosts
@ -250,19 +273,29 @@ class NotificationEndpoint(object):
for principal in services:
if principal not in services_deleted:
try:
if ipaclient.service_has_hosts(principal):
if ipa.service_has_hosts(principal):
continue
except KeyError:
continue
ipaclient.delete_service(principal, batch=False)
ipa.delete_service(principal, batch=False)
services_deleted.append(principal)
principal_host = principal.split('/', 1)[1]
if principal_host not in hosts_deleted:
if not ipaclient.host_has_services(principal_host):
ipaclient.delete_subhost(principal_host, batch=False)
if not ipa.host_has_services(principal_host):
ipa.delete_subhost(principal_host, batch=False)
hosts_deleted.append(principal_host)
def _generate_hostname(self, hostname):
# FIXME: Don't re-calculate the hostname, fetch it from somewhere
project = 'foo'
domain = get_domain()
if CONF.project_subdomain:
host = '%s.%s.%s' % (hostname, project, domain)
else:
host = '%s.%s' % (hostname, domain)
return host
def main():
register_keystoneauth_opts(CONF)