check syncs and retry if inconsistent
This commit is contained in:
parent
8ec5688fe4
commit
4a9602f1c1
@ -64,6 +64,7 @@ from keystone_utils import (
|
||||
check_peer_actions,
|
||||
CA_CERT_PATH,
|
||||
ensure_permissions,
|
||||
print_rel_debug,
|
||||
)
|
||||
|
||||
from charmhelpers.contrib.hahelpers.cluster import (
|
||||
@ -219,6 +220,8 @@ def pgsql_db_changed():
|
||||
@hooks.hook('identity-service-relation-changed')
|
||||
@synchronize_ca_if_changed()
|
||||
def identity_changed(relation_id=None, remote_unit=None):
|
||||
CONFIGS.write_all()
|
||||
|
||||
notifications = {}
|
||||
if is_elected_leader(CLUSTER_RES):
|
||||
# Catch database not configured error and defer until db ready
|
||||
@ -236,8 +239,6 @@ def identity_changed(relation_id=None, remote_unit=None):
|
||||
log("Unexpected exception occurred", level=ERROR)
|
||||
raise
|
||||
|
||||
CONFIGS.write_all()
|
||||
|
||||
settings = relation_get(rid=relation_id, unit=remote_unit)
|
||||
service = settings.get('service', None)
|
||||
if service:
|
||||
@ -332,6 +333,10 @@ def cluster_changed():
|
||||
|
||||
echo_whitelist.append('ssl-synced-units')
|
||||
|
||||
echo_settings = {k: v for k, v in relation_get().iteritems()
|
||||
if k in echo_whitelist}
|
||||
print_rel_debug(echo_settings, None, None, 'cluster_changed', '1')
|
||||
|
||||
# ssl cert sync must be done BEFORE this to reduce the risk of feedback
|
||||
# loops in cluster relation
|
||||
peer_echo(includes=echo_whitelist)
|
||||
@ -390,8 +395,9 @@ def ha_joined():
|
||||
@restart_on_change(restart_map())
|
||||
@synchronize_ca_if_changed()
|
||||
def ha_changed():
|
||||
clustered = relation_get('clustered')
|
||||
CONFIGS.write_all()
|
||||
|
||||
clustered = relation_get('clustered')
|
||||
if clustered and is_elected_leader(CLUSTER_RES):
|
||||
ensure_initial_admin(config)
|
||||
log('Cluster configured, notifying other services and updating '
|
||||
@ -442,14 +448,15 @@ def upgrade_charm():
|
||||
group='keystone',
|
||||
peer_interface='cluster',
|
||||
ensure_local_user=True)
|
||||
|
||||
CONFIGS.write_all()
|
||||
|
||||
if is_elected_leader(CLUSTER_RES):
|
||||
log('Cluster leader - ensuring endpoint configuration is up to '
|
||||
'date', level=DEBUG)
|
||||
time.sleep(10)
|
||||
update_all_identity_relation_units()
|
||||
|
||||
CONFIGS.write_all()
|
||||
|
||||
|
||||
def main():
|
||||
try:
|
||||
|
@ -5,6 +5,13 @@ import shutil
|
||||
import subprocess
|
||||
import tarfile
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
from charmhelpers.core.hookenv import (
|
||||
log,
|
||||
DEBUG,
|
||||
WARNING,
|
||||
)
|
||||
|
||||
CA_EXPIRY = '365'
|
||||
ORG_NAME = 'Ubuntu'
|
||||
@ -278,23 +285,42 @@ class JujuCA(object):
|
||||
crt = self._sign_csr(csr, service, common_name)
|
||||
cmd = ['chown', '-R', '%s.%s' % (self.user, self.group), self.ca_dir]
|
||||
subprocess.check_call(cmd)
|
||||
print 'Signed new CSR, crt @ %s' % crt
|
||||
log('Signed new CSR, crt @ %s' % crt, level=DEBUG)
|
||||
return crt, key
|
||||
|
||||
def get_cert_and_key(self, common_name):
|
||||
print 'Getting certificate and key for %s.' % common_name
|
||||
key = os.path.join(self.ca_dir, 'certs', '%s.key' % common_name)
|
||||
crt = os.path.join(self.ca_dir, 'certs', '%s.crt' % common_name)
|
||||
if os.path.isfile(crt):
|
||||
print 'Found existing certificate for %s.' % common_name
|
||||
crt = open(crt, 'r').read()
|
||||
try:
|
||||
key = open(key, 'r').read()
|
||||
except:
|
||||
print 'Could not load ssl private key for %s from %s' %\
|
||||
(common_name, key)
|
||||
exit(1)
|
||||
return crt, key
|
||||
log('Getting certificate and key for %s.' % common_name, level=DEBUG)
|
||||
keypath = os.path.join(self.ca_dir, 'certs', '%s.key' % common_name)
|
||||
crtpath = os.path.join(self.ca_dir, 'certs', '%s.crt' % common_name)
|
||||
if os.path.isfile(crtpath):
|
||||
log('Found existing certificate for %s.' % common_name,
|
||||
level=DEBUG)
|
||||
max_retries = 3
|
||||
while True:
|
||||
mtime = os.path.getmtime(crtpath)
|
||||
|
||||
crt = open(crtpath, 'r').read()
|
||||
try:
|
||||
key = open(keypath, 'r').read()
|
||||
except:
|
||||
msg = ('Could not load ssl private key for %s from %s' %
|
||||
(common_name, keypath))
|
||||
raise Exception(msg)
|
||||
|
||||
# Ensure we are not reading a file that is being written to
|
||||
if mtime != os.path.getmtime(crtpath):
|
||||
max_retries -= 1
|
||||
if max_retries == 0:
|
||||
msg = ("crt contents changed during read - retry "
|
||||
"failed")
|
||||
raise Exception(msg)
|
||||
|
||||
log("crt contents changed during read - re-reading",
|
||||
level=WARNING)
|
||||
time.sleep(1)
|
||||
else:
|
||||
return crt, key
|
||||
|
||||
crt, key = self._create_certificate(common_name, common_name)
|
||||
return open(crt, 'r').read(), open(key, 'r').read()
|
||||
|
||||
|
@ -63,6 +63,7 @@ from charmhelpers.core.hookenv import (
|
||||
DEBUG,
|
||||
INFO,
|
||||
WARNING,
|
||||
ERROR,
|
||||
)
|
||||
|
||||
from charmhelpers.fetch import (
|
||||
@ -652,14 +653,14 @@ def check_peer_actions():
|
||||
key = re.compile("^(.+)?\.(.+)?\.(.+)")
|
||||
res = re.search(key, flag)
|
||||
if res:
|
||||
source = res. group(1)
|
||||
service = res. group(2)
|
||||
action = res. group(3)
|
||||
source = res.group(1)
|
||||
service = res.group(2)
|
||||
action = res.group(3)
|
||||
else:
|
||||
key = re.compile("^(.+)?\.(.+)?")
|
||||
res = re.search(key, flag)
|
||||
source = res. group(1)
|
||||
action = res. group(2)
|
||||
source = res.group(1)
|
||||
action = res.group(2)
|
||||
|
||||
# Don't execute actions requested byu this unit.
|
||||
if local_unit().replace('.', '-') != source:
|
||||
@ -676,7 +677,7 @@ def check_peer_actions():
|
||||
(action, service), level=DEBUG)
|
||||
service_stop(service)
|
||||
elif action == 'update-ca-certificates':
|
||||
log("Running update-ca-certificates", level=DEBUG)
|
||||
log("Running %s" % (action), level=DEBUG)
|
||||
subprocess.check_call(['update-ca-certificates'])
|
||||
else:
|
||||
log("Unknown action flag=%s" % (flag), level=WARNING)
|
||||
@ -700,12 +701,13 @@ def create_peer_service_actions(action, services):
|
||||
perms=0o644)
|
||||
|
||||
|
||||
def create_service_action(action):
|
||||
action = "%s.%s" % (local_unit().replace('/', '-'), action)
|
||||
flagfile = os.path.join(SYNC_FLAGS_DIR, action)
|
||||
log("Creating action %s" % (flagfile), level=DEBUG)
|
||||
write_file(flagfile, content='', owner=SSH_USER, group='keystone',
|
||||
perms=0o644)
|
||||
def create_peer_actions(actions):
|
||||
for action in actions:
|
||||
action = "%s.%s" % (local_unit().replace('/', '-'), action)
|
||||
flagfile = os.path.join(SYNC_FLAGS_DIR, action)
|
||||
log("Creating action %s" % (flagfile), level=DEBUG)
|
||||
write_file(flagfile, content='', owner=SSH_USER, group='keystone',
|
||||
perms=0o644)
|
||||
|
||||
|
||||
@retry_on_exception(3, base_delay=2, exc_type=subprocess.CalledProcessError)
|
||||
@ -756,22 +758,47 @@ def synchronize_ca(fatal=False):
|
||||
# We need to restart peer apache services to ensure they have picked up
|
||||
# new ssl keys.
|
||||
create_peer_service_actions('restart', ['apache2'])
|
||||
create_service_action('update-ca-certificates')
|
||||
try:
|
||||
unison_sync(paths_to_sync)
|
||||
except:
|
||||
if fatal:
|
||||
raise
|
||||
else:
|
||||
log("Sync failed but fatal=False", level=INFO)
|
||||
return
|
||||
create_peer_actions(['update-ca-certificates'])
|
||||
|
||||
trigger = str(uuid.uuid4())
|
||||
log("Sending restart-services-trigger=%s to all peers" % (trigger),
|
||||
retries = 3
|
||||
while True:
|
||||
hash1 = hashlib.sha256()
|
||||
for path in paths_to_sync:
|
||||
update_hash_from_path(hash1, path)
|
||||
|
||||
try:
|
||||
unison_sync(paths_to_sync)
|
||||
except:
|
||||
if fatal:
|
||||
raise
|
||||
else:
|
||||
log("Sync failed but fatal=False", level=INFO)
|
||||
return
|
||||
|
||||
hash2 = hashlib.sha256()
|
||||
for path in paths_to_sync:
|
||||
update_hash_from_path(hash2, path)
|
||||
|
||||
# Detect whether someone else has synced to this unit while we did our
|
||||
# transfer.
|
||||
if hash1.hexdigest() != hash2.hexdigest():
|
||||
retries -= 1
|
||||
if retries > 0:
|
||||
log("SSL dir contents changed during sync - retrying unison "
|
||||
"sync %s more times" % (retries), level=WARNING)
|
||||
else:
|
||||
log("SSL dir contents changed during sync - retries failed",
|
||||
level=ERROR)
|
||||
return {}
|
||||
else:
|
||||
break
|
||||
|
||||
hash = hash1.hexdigest()
|
||||
log("Sending restart-services-trigger=%s to all peers" % (hash),
|
||||
level=DEBUG)
|
||||
|
||||
log("Sync complete", level=DEBUG)
|
||||
return {'restart-services-trigger': trigger,
|
||||
return {'restart-services-trigger': hash,
|
||||
'ssl-synced-units': peer_units()}
|
||||
|
||||
|
||||
@ -802,28 +829,31 @@ def synchronize_ca_if_changed(force=False, fatal=False):
|
||||
def inner_synchronize_ca_if_changed2(*args, **kwargs):
|
||||
# Only sync master can do sync. Ensure (a) we are not nested and
|
||||
# (b) a master is elected and we are it.
|
||||
acquired = SSL_SYNC_SEMAPHORE.acquire(blocking=0)
|
||||
try:
|
||||
acquired = SSL_SYNC_SEMAPHORE.acquire(blocking=0)
|
||||
if not acquired or not is_elected_leader(CLUSTER_RES):
|
||||
if not acquired:
|
||||
log("Nested sync - ignoring", level=DEBUG)
|
||||
return f(*args, **kwargs)
|
||||
|
||||
if not is_elected_leader(CLUSTER_RES):
|
||||
log("Not leader - ignoring sync", level=DEBUG)
|
||||
return f(*args, **kwargs)
|
||||
|
||||
peer_settings = {}
|
||||
# Ensure we don't do a double sync if we are nested.
|
||||
if not force:
|
||||
hash1 = hashlib.sha256()
|
||||
for path in [SSL_DIR, APACHE_SSL_DIR, CA_CERT_PATH]:
|
||||
update_hash_from_path(hash1, path)
|
||||
ssl_dirs = [SSL_DIR, APACHE_SSL_DIR, CA_CERT_PATH]
|
||||
|
||||
hash1 = hash1.hexdigest()
|
||||
hash1 = hashlib.sha256()
|
||||
for path in ssl_dirs:
|
||||
update_hash_from_path(hash1, path)
|
||||
|
||||
ret = f(*args, **kwargs)
|
||||
|
||||
hash2 = hashlib.sha256()
|
||||
for path in [SSL_DIR, APACHE_SSL_DIR, CA_CERT_PATH]:
|
||||
for path in ssl_dirs:
|
||||
update_hash_from_path(hash2, path)
|
||||
|
||||
hash2 = hash2.hexdigest()
|
||||
if hash1 != hash2:
|
||||
if hash1.hexdigest() != hash2.hexdigest():
|
||||
log("SSL certs have changed - syncing peers",
|
||||
level=DEBUG)
|
||||
peer_settings = synchronize_ca(fatal=fatal)
|
||||
@ -832,9 +862,8 @@ def synchronize_ca_if_changed(force=False, fatal=False):
|
||||
level=DEBUG)
|
||||
else:
|
||||
ret = f(*args, **kwargs)
|
||||
if force:
|
||||
log("Doing forced ssl cert sync", level=DEBUG)
|
||||
peer_settings = synchronize_ca(fatal=fatal)
|
||||
log("Doing forced ssl cert sync", level=DEBUG)
|
||||
peer_settings = synchronize_ca(fatal=fatal)
|
||||
|
||||
if peer_settings:
|
||||
for rid in relation_ids('cluster'):
|
||||
@ -889,6 +918,22 @@ def relation_list(rid):
|
||||
return result
|
||||
|
||||
|
||||
def print_rel_debug(relation_data, remote_unit, relation_id, tag, name):
|
||||
debug_settings = relation_get(unit=local_unit(), rid=relation_id)
|
||||
diff = {k: {'b': debug_settings[k], 'a': v} for k, v in
|
||||
relation_data.iteritems()
|
||||
if (k in debug_settings and
|
||||
relation_data[k] != debug_settings.get(k))}
|
||||
|
||||
unchanged = [k for k in debug_settings.iterkeys()
|
||||
if k not in relation_data]
|
||||
|
||||
log("[debug:%s:%s:%s:%s] diff=%s" %
|
||||
(name, tag, remote_unit, relation_id, str(diff)), level=DEBUG)
|
||||
log("[debug:%s:%s:%s:%s] unchanged=%s" %
|
||||
(name, tag, remote_unit, relation_id, unchanged), level=DEBUG)
|
||||
|
||||
|
||||
def add_service_to_keystone(relation_id=None, remote_unit=None):
|
||||
import manager
|
||||
manager = manager.KeystoneManager(endpoint=get_local_endpoint(),
|
||||
@ -900,7 +945,7 @@ def add_service_to_keystone(relation_id=None, remote_unit=None):
|
||||
https_cns = []
|
||||
if single.issubset(settings):
|
||||
# other end of relation advertised only one endpoint
|
||||
if 'None' in [v for k, v in settings.iteritems()]:
|
||||
if 'None' in settings.itervalues():
|
||||
# Some backend services advertise no endpoint but require a
|
||||
# hook execution to update auth strategy.
|
||||
relation_data = {}
|
||||
@ -928,6 +973,10 @@ def add_service_to_keystone(relation_id=None, remote_unit=None):
|
||||
for role in get_requested_roles(settings):
|
||||
log("Creating requested role: %s" % role)
|
||||
create_role(role)
|
||||
|
||||
print_rel_debug(relation_data, remote_unit, relation_id, "1",
|
||||
'add-svc-to-ks')
|
||||
|
||||
peer_store_and_set(relation_id=relation_id,
|
||||
**relation_data)
|
||||
return
|
||||
@ -1003,7 +1052,7 @@ def add_service_to_keystone(relation_id=None, remote_unit=None):
|
||||
if prefix:
|
||||
service_username = "%s%s" % (prefix, service_username)
|
||||
|
||||
if 'None' in [v for k, v in settings.iteritems()]:
|
||||
if 'None' in settings.itervalues():
|
||||
return
|
||||
|
||||
if not service_username:
|
||||
@ -1041,10 +1090,10 @@ def add_service_to_keystone(relation_id=None, remote_unit=None):
|
||||
"service_password": service_password,
|
||||
"service_tenant": service_tenant,
|
||||
"service_tenant_id": manager.resolve_tenant_id(service_tenant),
|
||||
"https_keystone": "False",
|
||||
"ssl_cert": "",
|
||||
"ssl_key": "",
|
||||
"ca_cert": ""
|
||||
"https_keystone": None,
|
||||
"ssl_cert": None,
|
||||
"ssl_key": None,
|
||||
"ca_cert": None
|
||||
}
|
||||
|
||||
# Check if https is enabled
|
||||
@ -1070,6 +1119,9 @@ def add_service_to_keystone(relation_id=None, remote_unit=None):
|
||||
ca_bundle = ca.get_ca_bundle()
|
||||
relation_data['ca_cert'] = b64encode(ca_bundle)
|
||||
relation_data['https_keystone'] = 'True'
|
||||
|
||||
print_rel_debug(relation_data, remote_unit, relation_id, "2",
|
||||
'add-svc-to-ks')
|
||||
peer_store_and_set(relation_id=relation_id,
|
||||
**relation_data)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user