124 lines
3.3 KiB
Django/Jinja
124 lines
3.3 KiB
Django/Jinja
#!/usr/bin/env python
|
|
|
|
import functools
|
|
import logging
|
|
import os
|
|
import socket
|
|
import sys
|
|
import time
|
|
|
|
import etcd
|
|
|
|
CONNECTION_ATTEMPTS = 3
|
|
CONNECTION_DELAY = 5
|
|
|
|
LOG_DATEFMT = "%Y-%m-%d %H:%M:%S"
|
|
LOG_FORMAT = "%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s"
|
|
logging.basicConfig(format=LOG_FORMAT, datefmt=LOG_DATEFMT)
|
|
LOG = logging.getLogger(__name__)
|
|
LOG.setLevel(logging.DEBUG)
|
|
|
|
ETCD_PATH = "/pxc-cluster/{{ percona.cluster_name }}"
|
|
HOSTNAME = socket.getfqdn()
|
|
IPADDR = socket.gethostbyname(HOSTNAME)
|
|
|
|
|
|
def retry(f):
|
|
@functools.wraps(f)
|
|
def wrap(*args, **kwargs):
|
|
attempts = CONNECTION_ATTEMPTS
|
|
delay = CONNECTION_DELAY
|
|
while attempts > 1:
|
|
try:
|
|
return f(*args, **kwargs)
|
|
except etcd.EtcdException as e:
|
|
LOG.warning('Etcd is not ready: %s', str(e))
|
|
LOG.warning('Retrying in %d seconds...', delay)
|
|
time.sleep(delay)
|
|
attempts -= 1
|
|
return f(*args, **kwargs)
|
|
return wrap
|
|
|
|
|
|
def get_etcd_client():
|
|
|
|
return etcd.Client(host="{{ address("etcd") }}",
|
|
port={{ etcd.client_port.cont }},
|
|
allow_reconnect=True,
|
|
read_timeout=2)
|
|
|
|
|
|
@retry
|
|
def fetch_status(etcd_client):
|
|
|
|
key = ETCD_PATH
|
|
result = [str(i.key).replace(key + "/", '')
|
|
for i in etcd_client.read(key).leaves
|
|
if str(i.key) != key]
|
|
LOG.info("Current cluster state is: %s", result)
|
|
return result
|
|
|
|
|
|
def _etcd_set(etcd_client, data, ttl):
|
|
|
|
key = os.path.join(ETCD_PATH, IPADDR, data[0])
|
|
etcd_client.set(key, data[1], ttl=ttl)
|
|
LOG.info("Set %s with value '%s'", key, data[1])
|
|
|
|
|
|
def _etcd_create_dir(etcd_client, ttl):
|
|
|
|
key = os.path.join(ETCD_PATH, IPADDR)
|
|
try:
|
|
etcd_client.get(key)
|
|
LOG.warning("Found stale key '%s', deleting", key)
|
|
etcd_client.delete(key, recursive=True, dir=True)
|
|
etcd_client.write(os.path.join(ETCD_PATH, IPADDR), None, ttl=ttl,
|
|
dir=True)
|
|
LOG.info("Set ttl for '%s' directory to %s", key, ttl)
|
|
except etcd.EtcdKeyNotFound:
|
|
etcd_client.write(os.path.join(ETCD_PATH, IPADDR), None, ttl=ttl,
|
|
dir=True)
|
|
LOG.info("Set ttl for '%s' directory to %s", key, ttl)
|
|
|
|
|
|
@retry
|
|
def set_status(etcd_client, ttl=30):
|
|
|
|
etcd_client = get_etcd_client()
|
|
_etcd_create_dir(etcd_client, ttl)
|
|
_etcd_set(etcd_client, ('ctime', time.time()), ttl)
|
|
_etcd_set(etcd_client, ('ipaddr', IPADDR), ttl)
|
|
_etcd_set(etcd_client, ('hostname', HOSTNAME), ttl)
|
|
|
|
|
|
def create_join_list(status):
|
|
|
|
status.remove(IPADDR)
|
|
if not status:
|
|
return ""
|
|
else:
|
|
return ','.join(status)
|
|
|
|
|
|
def main(ttl):
|
|
|
|
try:
|
|
etcd_client = get_etcd_client()
|
|
lock = etcd.Lock(etcd_client, 'galera_bootstrap')
|
|
LOG.info("Locking...")
|
|
lock.acquire(blocking=True, lock_ttl=ttl)
|
|
LOG.info("Successfuly acquired lock")
|
|
set_status(etcd_client, ttl)
|
|
status = fetch_status(etcd_client)
|
|
# This output will be stdout == data
|
|
print(create_join_list(status))
|
|
except Exception as err:
|
|
LOG.exception(err)
|
|
finally:
|
|
lock.release()
|
|
LOG.info("Successfuly released lock")
|
|
|
|
if __name__ == "__main__":
|
|
main(ttl=60)
|