From 1eae7a670bc305a75ee3765056f48ae0648bc9f8 Mon Sep 17 00:00:00 2001 From: Rob Crittenden Date: Fri, 3 Mar 2017 17:53:07 -0500 Subject: [PATCH] Make the IPA connection code more robust for notifications Notifications can be multi-threaded but each thread was sharing the same IPAClient instance which was causing contention in the retry code (and likely the ccache). Move the IPA object closer to execution so each notification will have its own instance. This will mean more kinit activity but each one will be isolated and be able to handle expired tickets, IPA being down, network issues, etc. Add backoff code on failures so we don't spam the IPA server with retries. It is a by-two backoff from 2 - 1024 seconds. NOTE: novajoin-server will not use this backoff code. It will continue to use the retries configuration setting. This is because we know there is a limited window to respond so cannot infinitely do a retry unlike notifications. Change-Id: Ia18d3f97f7549c89dcf4e6f014f44c3fcebc919f --- novajoin/ipa.py | 76 +++++++++++++++++++++++---------------- novajoin/notifications.py | 41 ++++++++++----------- 2 files changed, 67 insertions(+), 50 deletions(-) diff --git a/novajoin/ipa.py b/novajoin/ipa.py index e8a80d2..f96f450 100644 --- a/novajoin/ipa.py +++ b/novajoin/ipa.py @@ -13,9 +13,11 @@ # under the License. import os +import time import uuid try: + from gssapi.exceptions import GSSError from ipalib import api from ipalib import errors from ipapython.ipautil import kinit_keytab @@ -38,7 +40,7 @@ LOG = logging.getLogger(__name__) class IPANovaJoinBase(object): - def __init__(self): + def __init__(self, backoff=0): try: self.ntries = CONF.connect_retries except cfg.NoSuchOptError: @@ -54,6 +56,7 @@ class IPANovaJoinBase(object): api.bootstrap(context='novajoin') api.finalize() self.batch_args = list() + self.backoff = backoff def split_principal(self, principal): """Split a principal into its components. Copied from IPA 4.0.0""" @@ -109,11 +112,19 @@ class IPANovaJoinBase(object): return (hostname, realm) + def __backoff(self): + LOG.debug("Backing off %s seconds", self.backoff) + time.sleep(self.backoff) + if self.backoff < 1024: + self.backoff = self.backoff * 2 + def __get_connection(self): """Make a connection to IPA or raise an error.""" tries = 0 - while tries <= self.ntries: + while (tries <= self.ntries) or (self.backoff > 0): + if self.backoff == 0: + LOG.debug("Attempt %d of %d", tries, self.ntries) if api.Backend.rpcclient.isconnected(): api.Backend.rpcclient.disconnect() try: @@ -126,11 +137,20 @@ class IPANovaJoinBase(object): errors.KerberosError) as e: LOG.debug("kinit again: %s", e) # pylint: disable=no-member - kinit_keytab(str('nova/%s@%s' % - (api.env.host, api.env.realm)), - CONF.keytab, - self.ccache) + try: + kinit_keytab(str('nova/%s@%s' % + (api.env.host, api.env.realm)), + CONF.keytab, + self.ccache) + except GSSError as e: + LOG.debug("kinit failed: %s", e) + if tries > 0 and self.backoff: + self.__backoff() tries += 1 + except errors.NetworkError: + tries += 1 + if self.backoff: + self.__backoff() else: return @@ -151,11 +171,7 @@ class IPANovaJoinBase(object): }) def flush_batch_operation(self): - """Make an IPA batch call - - Try twice to run the command. One execution may fail if we - previously had a connection but the ticket expired. - """ + """Make an IPA batch call.""" LOG.debug("flush_batch_operation") if not self.batch_args: return None @@ -166,28 +182,27 @@ class IPANovaJoinBase(object): return self._call_ipa('batch', *self.batch_args, **kw) def _call_ipa(self, command, *args, **kw): - """Make an IPA call. - - Try twice to run the command. One execution may fail if we - previously had a connection but the ticket expired. - """ - + """Make an IPA call.""" if not api.Backend.rpcclient.isconnected(): self.__get_connection() if 'version' not in kw: kw['version'] = u'2.146' # IPA v4.2.0 for compatibility - try: - result = api.Command[command](*args, **kw) - LOG.debug(result) - return result - except (errors.CCacheError, - errors.TicketExpired, - errors.KerberosError): - LOG.debug("Refresh authentication") - self.__get_connection() - result = api.Command[command](*args, **kw) - LOG.debug(result) - return result + + while True: + try: + result = api.Command[command](*args, **kw) + LOG.debug(result) + return result + except (errors.CCacheError, + errors.TicketExpired, + errors.KerberosError): + LOG.debug("Refresh authentication") + self.__get_connection() + except errors.NetworkError: + if self.backoff: + self.__backoff() + else: + raise def _ipa_client_configured(self): """Determine if the machine is an enrolled IPA client. @@ -383,7 +398,8 @@ class IPAClient(IPANovaJoinBase): try: (service, hostname, realm) = self.split_principal( - service_principal) + service_principal + ) except errors.MalformedServicePrincipal as e: LOG.error("Unable to split principal %s: %s", service_principal, e) raise diff --git a/novajoin/notifications.py b/novajoin/notifications.py index 9eef23f..beb3855 100644 --- a/novajoin/notifications.py +++ b/novajoin/notifications.py @@ -38,6 +38,8 @@ CONF = config.CONF LOG = logging.getLogger(__name__) +BACKOFF = 2 + def novaclient(): session = get_session() @@ -58,9 +60,6 @@ class NotificationEndpoint(object): '^network.floating_ip.(dis)?associate|' '^floatingip.update.end') - def __init__(self): - self.ipaclient = IPAClient() - def _generate_hostname(self, hostname): # FIXME: Don't re-calculate the hostname, fetch it from somewhere project = 'foo' @@ -78,6 +77,7 @@ class NotificationEndpoint(object): LOG.debug("publisher: %s, event: %s, metadata: %s", publisher_id, event_type, metadata) + ipaclient = IPAClient(backoff=BACKOFF) if event_type == 'compute.instance.create.end': hostname = self._generate_hostname(payload.get('hostname')) instance_id = payload.get('instance_id') @@ -99,15 +99,15 @@ class NotificationEndpoint(object): return LOG.info("Delete host %s (%s)", instance_id, hostname) - self.ipaclient.delete_host(hostname, {}) - self.delete_subhosts(hostname_short, payload_metadata) + ipaclient.delete_host(hostname, {}) + self.delete_subhosts(ipaclient, hostname_short, payload_metadata) elif event_type == 'network.floating_ip.associate': floating_ip = payload.get('floating_ip') LOG.info("Associate floating IP %s" % floating_ip) nova = novaclient() server = nova.servers.get(payload.get('instance_id')) if server: - self.ipaclient.add_ip(server.get, floating_ip) + ipaclient.add_ip(server.get, floating_ip) else: LOG.error("Could not resolve %s into a hostname", payload.get('instance_id')) @@ -117,7 +117,7 @@ class NotificationEndpoint(object): nova = novaclient() server = nova.servers.get(payload.get('instance_id')) if server: - self.ipaclient.remove_ip(server.name, floating_ip) + ipaclient.remove_ip(server.name, floating_ip) else: LOG.error("Could not resolve %s into a hostname", payload.get('instance_id')) @@ -135,13 +135,13 @@ class NotificationEndpoint(object): if device_id: server = nova.servers.get(device_id) if server: - self.ipaclient.add_ip(server.name, floating_ip) + ipaclient.add_ip(server.name, floating_ip) else: LOG.error("Expected 1 port, got %d", len(ports)) else: LOG.error("Status update or unknown") - def delete_subhosts(self, hostname_short, metadata): + def delete_subhosts(self, ipaclient, hostname_short, metadata): """Delete subhosts and remove VIPs if possible. Servers can have multiple network interfaces, and therefore can @@ -162,14 +162,15 @@ class NotificationEndpoint(object): return if 'compact_services' in metadata: - self.handle_compact_services(hostname_short, + self.handle_compact_services(ipaclient, hostname_short, metadata.get('compact_services')) managed_services = [metadata[key] for key in metadata.keys() if key.startswith('managed_service_')] if managed_services: - self.handle_managed_services(managed_services) + self.handle_managed_services(ipaclient, managed_services) - def handle_compact_services(self, host_short, service_repr_json): + def handle_compact_services(self, ipaclient, host_short, + service_repr_json): """Reconstructs and removes subhosts for compact services. Data looks like this: @@ -187,7 +188,7 @@ class NotificationEndpoint(object): service_repr = json.loads(service_repr_json) hosts_found = list() - self.ipaclient.start_batch_operation() + ipaclient.start_batch_operation() for service_name, net_list in service_repr.items(): for network in net_list: host = "%s.%s" % (host_short, network) @@ -195,11 +196,11 @@ class NotificationEndpoint(object): # remove host if principal_host not in hosts_found: - self.ipaclient.delete_subhost(principal_host) + ipaclient.delete_subhost(principal_host) hosts_found.append(principal_host) - self.ipaclient.flush_batch_operation() + ipaclient.flush_batch_operation() - def handle_managed_services(self, services): + def handle_managed_services(self, ipaclient, services): """Delete any managed services if possible. Checks to see if the managed service subhost has no managed hosts @@ -212,17 +213,17 @@ class NotificationEndpoint(object): for principal in services: if principal not in services_deleted: try: - if self.ipaclient.service_has_hosts(principal): + if ipaclient.service_has_hosts(principal): continue except KeyError: continue - self.ipaclient.delete_service(principal, batch=False) + ipaclient.delete_service(principal, batch=False) services_deleted.append(principal) principal_host = principal.split('/', 1)[1] if principal_host not in hosts_deleted: - if not self.ipaclient.host_has_services(principal_host): - self.ipaclient.delete_subhost(principal_host, batch=False) + if not ipaclient.host_has_services(principal_host): + ipaclient.delete_subhost(principal_host, batch=False) hosts_deleted.append(principal_host)