Make the IPA connection code more robust for notifications
Notifications can be multi-threaded but each thread was sharing the same IPAClient instance which was causing contention in the retry code (and likely the ccache). Move the IPA object closer to execution so each notification will have its own instance. This will mean more kinit activity but each one will be isolated and be able to handle expired tickets, IPA being down, network issues, etc. Add backoff code on failures so we don't spam the IPA server with retries. It is a by-two backoff from 2 - 1024 seconds. NOTE: novajoin-server will not use this backoff code. It will continue to use the retries configuration setting. This is because we know there is a limited window to respond so cannot infinitely do a retry unlike notifications. Change-Id: Ia18d3f97f7549c89dcf4e6f014f44c3fcebc919f
This commit is contained in:
parent
886dae3ad8
commit
1eae7a670b
|
@ -13,9 +13,11 @@
|
|||
# under the License.
|
||||
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
|
||||
try:
|
||||
from gssapi.exceptions import GSSError
|
||||
from ipalib import api
|
||||
from ipalib import errors
|
||||
from ipapython.ipautil import kinit_keytab
|
||||
|
@ -38,7 +40,7 @@ LOG = logging.getLogger(__name__)
|
|||
|
||||
class IPANovaJoinBase(object):
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, backoff=0):
|
||||
try:
|
||||
self.ntries = CONF.connect_retries
|
||||
except cfg.NoSuchOptError:
|
||||
|
@ -54,6 +56,7 @@ class IPANovaJoinBase(object):
|
|||
api.bootstrap(context='novajoin')
|
||||
api.finalize()
|
||||
self.batch_args = list()
|
||||
self.backoff = backoff
|
||||
|
||||
def split_principal(self, principal):
|
||||
"""Split a principal into its components. Copied from IPA 4.0.0"""
|
||||
|
@ -109,11 +112,19 @@ class IPANovaJoinBase(object):
|
|||
|
||||
return (hostname, realm)
|
||||
|
||||
def __backoff(self):
|
||||
LOG.debug("Backing off %s seconds", self.backoff)
|
||||
time.sleep(self.backoff)
|
||||
if self.backoff < 1024:
|
||||
self.backoff = self.backoff * 2
|
||||
|
||||
def __get_connection(self):
|
||||
"""Make a connection to IPA or raise an error."""
|
||||
tries = 0
|
||||
|
||||
while tries <= self.ntries:
|
||||
while (tries <= self.ntries) or (self.backoff > 0):
|
||||
if self.backoff == 0:
|
||||
LOG.debug("Attempt %d of %d", tries, self.ntries)
|
||||
if api.Backend.rpcclient.isconnected():
|
||||
api.Backend.rpcclient.disconnect()
|
||||
try:
|
||||
|
@ -126,11 +137,20 @@ class IPANovaJoinBase(object):
|
|||
errors.KerberosError) as e:
|
||||
LOG.debug("kinit again: %s", e)
|
||||
# pylint: disable=no-member
|
||||
kinit_keytab(str('nova/%s@%s' %
|
||||
(api.env.host, api.env.realm)),
|
||||
CONF.keytab,
|
||||
self.ccache)
|
||||
try:
|
||||
kinit_keytab(str('nova/%s@%s' %
|
||||
(api.env.host, api.env.realm)),
|
||||
CONF.keytab,
|
||||
self.ccache)
|
||||
except GSSError as e:
|
||||
LOG.debug("kinit failed: %s", e)
|
||||
if tries > 0 and self.backoff:
|
||||
self.__backoff()
|
||||
tries += 1
|
||||
except errors.NetworkError:
|
||||
tries += 1
|
||||
if self.backoff:
|
||||
self.__backoff()
|
||||
else:
|
||||
return
|
||||
|
||||
|
@ -151,11 +171,7 @@ class IPANovaJoinBase(object):
|
|||
})
|
||||
|
||||
def flush_batch_operation(self):
|
||||
"""Make an IPA batch call
|
||||
|
||||
Try twice to run the command. One execution may fail if we
|
||||
previously had a connection but the ticket expired.
|
||||
"""
|
||||
"""Make an IPA batch call."""
|
||||
LOG.debug("flush_batch_operation")
|
||||
if not self.batch_args:
|
||||
return None
|
||||
|
@ -166,28 +182,27 @@ class IPANovaJoinBase(object):
|
|||
return self._call_ipa('batch', *self.batch_args, **kw)
|
||||
|
||||
def _call_ipa(self, command, *args, **kw):
|
||||
"""Make an IPA call.
|
||||
|
||||
Try twice to run the command. One execution may fail if we
|
||||
previously had a connection but the ticket expired.
|
||||
"""
|
||||
|
||||
"""Make an IPA call."""
|
||||
if not api.Backend.rpcclient.isconnected():
|
||||
self.__get_connection()
|
||||
if 'version' not in kw:
|
||||
kw['version'] = u'2.146' # IPA v4.2.0 for compatibility
|
||||
try:
|
||||
result = api.Command[command](*args, **kw)
|
||||
LOG.debug(result)
|
||||
return result
|
||||
except (errors.CCacheError,
|
||||
errors.TicketExpired,
|
||||
errors.KerberosError):
|
||||
LOG.debug("Refresh authentication")
|
||||
self.__get_connection()
|
||||
result = api.Command[command](*args, **kw)
|
||||
LOG.debug(result)
|
||||
return result
|
||||
|
||||
while True:
|
||||
try:
|
||||
result = api.Command[command](*args, **kw)
|
||||
LOG.debug(result)
|
||||
return result
|
||||
except (errors.CCacheError,
|
||||
errors.TicketExpired,
|
||||
errors.KerberosError):
|
||||
LOG.debug("Refresh authentication")
|
||||
self.__get_connection()
|
||||
except errors.NetworkError:
|
||||
if self.backoff:
|
||||
self.__backoff()
|
||||
else:
|
||||
raise
|
||||
|
||||
def _ipa_client_configured(self):
|
||||
"""Determine if the machine is an enrolled IPA client.
|
||||
|
@ -383,7 +398,8 @@ class IPAClient(IPANovaJoinBase):
|
|||
|
||||
try:
|
||||
(service, hostname, realm) = self.split_principal(
|
||||
service_principal)
|
||||
service_principal
|
||||
)
|
||||
except errors.MalformedServicePrincipal as e:
|
||||
LOG.error("Unable to split principal %s: %s", service_principal, e)
|
||||
raise
|
||||
|
|
|
@ -38,6 +38,8 @@ CONF = config.CONF
|
|||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
BACKOFF = 2
|
||||
|
||||
|
||||
def novaclient():
|
||||
session = get_session()
|
||||
|
@ -58,9 +60,6 @@ class NotificationEndpoint(object):
|
|||
'^network.floating_ip.(dis)?associate|'
|
||||
'^floatingip.update.end')
|
||||
|
||||
def __init__(self):
|
||||
self.ipaclient = IPAClient()
|
||||
|
||||
def _generate_hostname(self, hostname):
|
||||
# FIXME: Don't re-calculate the hostname, fetch it from somewhere
|
||||
project = 'foo'
|
||||
|
@ -78,6 +77,7 @@ class NotificationEndpoint(object):
|
|||
LOG.debug("publisher: %s, event: %s, metadata: %s", publisher_id,
|
||||
event_type, metadata)
|
||||
|
||||
ipaclient = IPAClient(backoff=BACKOFF)
|
||||
if event_type == 'compute.instance.create.end':
|
||||
hostname = self._generate_hostname(payload.get('hostname'))
|
||||
instance_id = payload.get('instance_id')
|
||||
|
@ -99,15 +99,15 @@ class NotificationEndpoint(object):
|
|||
return
|
||||
|
||||
LOG.info("Delete host %s (%s)", instance_id, hostname)
|
||||
self.ipaclient.delete_host(hostname, {})
|
||||
self.delete_subhosts(hostname_short, payload_metadata)
|
||||
ipaclient.delete_host(hostname, {})
|
||||
self.delete_subhosts(ipaclient, hostname_short, payload_metadata)
|
||||
elif event_type == 'network.floating_ip.associate':
|
||||
floating_ip = payload.get('floating_ip')
|
||||
LOG.info("Associate floating IP %s" % floating_ip)
|
||||
nova = novaclient()
|
||||
server = nova.servers.get(payload.get('instance_id'))
|
||||
if server:
|
||||
self.ipaclient.add_ip(server.get, floating_ip)
|
||||
ipaclient.add_ip(server.get, floating_ip)
|
||||
else:
|
||||
LOG.error("Could not resolve %s into a hostname",
|
||||
payload.get('instance_id'))
|
||||
|
@ -117,7 +117,7 @@ class NotificationEndpoint(object):
|
|||
nova = novaclient()
|
||||
server = nova.servers.get(payload.get('instance_id'))
|
||||
if server:
|
||||
self.ipaclient.remove_ip(server.name, floating_ip)
|
||||
ipaclient.remove_ip(server.name, floating_ip)
|
||||
else:
|
||||
LOG.error("Could not resolve %s into a hostname",
|
||||
payload.get('instance_id'))
|
||||
|
@ -135,13 +135,13 @@ class NotificationEndpoint(object):
|
|||
if device_id:
|
||||
server = nova.servers.get(device_id)
|
||||
if server:
|
||||
self.ipaclient.add_ip(server.name, floating_ip)
|
||||
ipaclient.add_ip(server.name, floating_ip)
|
||||
else:
|
||||
LOG.error("Expected 1 port, got %d", len(ports))
|
||||
else:
|
||||
LOG.error("Status update or unknown")
|
||||
|
||||
def delete_subhosts(self, hostname_short, metadata):
|
||||
def delete_subhosts(self, ipaclient, hostname_short, metadata):
|
||||
"""Delete subhosts and remove VIPs if possible.
|
||||
|
||||
Servers can have multiple network interfaces, and therefore can
|
||||
|
@ -162,14 +162,15 @@ class NotificationEndpoint(object):
|
|||
return
|
||||
|
||||
if 'compact_services' in metadata:
|
||||
self.handle_compact_services(hostname_short,
|
||||
self.handle_compact_services(ipaclient, hostname_short,
|
||||
metadata.get('compact_services'))
|
||||
managed_services = [metadata[key] for key in metadata.keys()
|
||||
if key.startswith('managed_service_')]
|
||||
if managed_services:
|
||||
self.handle_managed_services(managed_services)
|
||||
self.handle_managed_services(ipaclient, managed_services)
|
||||
|
||||
def handle_compact_services(self, host_short, service_repr_json):
|
||||
def handle_compact_services(self, ipaclient, host_short,
|
||||
service_repr_json):
|
||||
"""Reconstructs and removes subhosts for compact services.
|
||||
|
||||
Data looks like this:
|
||||
|
@ -187,7 +188,7 @@ class NotificationEndpoint(object):
|
|||
service_repr = json.loads(service_repr_json)
|
||||
hosts_found = list()
|
||||
|
||||
self.ipaclient.start_batch_operation()
|
||||
ipaclient.start_batch_operation()
|
||||
for service_name, net_list in service_repr.items():
|
||||
for network in net_list:
|
||||
host = "%s.%s" % (host_short, network)
|
||||
|
@ -195,11 +196,11 @@ class NotificationEndpoint(object):
|
|||
|
||||
# remove host
|
||||
if principal_host not in hosts_found:
|
||||
self.ipaclient.delete_subhost(principal_host)
|
||||
ipaclient.delete_subhost(principal_host)
|
||||
hosts_found.append(principal_host)
|
||||
self.ipaclient.flush_batch_operation()
|
||||
ipaclient.flush_batch_operation()
|
||||
|
||||
def handle_managed_services(self, services):
|
||||
def handle_managed_services(self, ipaclient, services):
|
||||
"""Delete any managed services if possible.
|
||||
|
||||
Checks to see if the managed service subhost has no managed hosts
|
||||
|
@ -212,17 +213,17 @@ class NotificationEndpoint(object):
|
|||
for principal in services:
|
||||
if principal not in services_deleted:
|
||||
try:
|
||||
if self.ipaclient.service_has_hosts(principal):
|
||||
if ipaclient.service_has_hosts(principal):
|
||||
continue
|
||||
except KeyError:
|
||||
continue
|
||||
self.ipaclient.delete_service(principal, batch=False)
|
||||
ipaclient.delete_service(principal, batch=False)
|
||||
services_deleted.append(principal)
|
||||
|
||||
principal_host = principal.split('/', 1)[1]
|
||||
if principal_host not in hosts_deleted:
|
||||
if not self.ipaclient.host_has_services(principal_host):
|
||||
self.ipaclient.delete_subhost(principal_host, batch=False)
|
||||
if not ipaclient.host_has_services(principal_host):
|
||||
ipaclient.delete_subhost(principal_host, batch=False)
|
||||
hosts_deleted.append(principal_host)
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue