Moving all common code to the entrypoint_utils.py
Change-Id: If056dff7157ac20def8cc88e98360322ed393fba
This commit is contained in:
parent
a2be6ede7e
commit
851cee0cf8
@ -7,6 +7,7 @@ COPY sudoers /etc/sudoers.d/haproxy_sudoers
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y -t testing haproxy \
|
||||
&& apt-get clean \
|
||||
&& pip install pymysql \
|
||||
&& chown -R haproxy: /etc/haproxy /var/lib/haproxy \
|
||||
&& usermod -a -G microservices haproxy
|
||||
|
||||
|
124
service/files/entrypoint_utils.py
Normal file
124
service/files/entrypoint_utils.py
Normal file
@ -0,0 +1,124 @@
|
||||
import functools
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
|
||||
import etcd
|
||||
import pymysql.cursors
|
||||
|
||||
LOG_DATEFMT = "%Y-%m-%d %H:%M:%S"
|
||||
LOG_FORMAT = "%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s"
|
||||
logging.basicConfig(format=LOG_FORMAT, datefmt=LOG_DATEFMT)
|
||||
LOG = logging.getLogger(__name__)
|
||||
LOG.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
def retry(f, connection_attempts=3, connection_delay=1):
|
||||
@functools.wraps(f)
|
||||
def wrap(*args, **kwargs):
|
||||
attempts = connection_attempts
|
||||
delay = connection_delay
|
||||
while attempts > 1:
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
except etcd.EtcdException as e:
|
||||
LOG.warning('Etcd is not ready: %s', str(e))
|
||||
LOG.warning('Retrying in %d seconds...', delay)
|
||||
time.sleep(delay)
|
||||
attempts -= 1
|
||||
except pymysql.OperationalError as e:
|
||||
LOG.warning('Mysql is not ready: %s', str(e))
|
||||
LOG.warning('Retrying in %d seconds...', delay)
|
||||
time.sleep(delay)
|
||||
attempts -= 1
|
||||
return f(*args, **kwargs)
|
||||
return wrap
|
||||
|
||||
|
||||
def get_config(globals_path, keys_list):
|
||||
|
||||
LOG.info("Getting global variables from %s", globals_path)
|
||||
variables = {}
|
||||
with open(globals_path) as f:
|
||||
global_conf = json.load(f)
|
||||
for key in keys_list:
|
||||
variables[key] = global_conf[key]
|
||||
LOG.debug(variables)
|
||||
return variables
|
||||
|
||||
|
||||
def get_etcd_client(etcd_machines):
|
||||
|
||||
"""
|
||||
Initialize the etcd client.
|
||||
|
||||
Args:
|
||||
etcd_machines (tuple of tuples): ((host, port), (host, port), ...)
|
||||
"""
|
||||
|
||||
etcd_machines_str = " ".join(["%s:%d" % (h, p) for h, p in etcd_machines])
|
||||
LOG.debug("Using the following etcd urls: %s", etcd_machines_str)
|
||||
return etcd.Client(host=etcd_machines, allow_reconnect=True,
|
||||
read_timeout=1, protocol='http')
|
||||
|
||||
|
||||
def get_mysql_client(host='127.0.0.1', unix_socket='', port=33306, user='root',
|
||||
password=''):
|
||||
mysql_client = pymysql.connect(host=host,
|
||||
unix_socket=unix_socket,
|
||||
port=port,
|
||||
user=user,
|
||||
password=password,
|
||||
connect_timeout=1,
|
||||
read_timeout=1,
|
||||
cursorclass=pymysql.cursors.DictCursor)
|
||||
return mysql_client
|
||||
|
||||
|
||||
def etcd_set(etcd_client, key, value, ttl, dir=False, append=False, **kwargs):
|
||||
|
||||
etcd_client.write(key, value, ttl, dir, append, **kwargs)
|
||||
LOG.info("Set %s with value '%s'", key, value)
|
||||
|
||||
|
||||
def etcd_refresh(etcd_client, key, ttl):
|
||||
|
||||
etcd_client.refresh(key, ttl)
|
||||
LOG.info("Refreshed %s ttl. New ttl is '%s'", key, ttl)
|
||||
|
||||
|
||||
def etcd_delete(etcd_client, key, recursive=True, dir=True, **kwargs):
|
||||
|
||||
etcd_client.delete(key, recursive, dir, **kwargs)
|
||||
LOG.warning("Deleted node's key '%s'", key)
|
||||
|
||||
|
||||
def etcd_read(etcd_client, key):
|
||||
|
||||
return etcd_client.read(key).value
|
||||
|
||||
|
||||
def etcd_get(etcd_client, key):
|
||||
|
||||
return etcd_client.get(key)
|
||||
|
||||
|
||||
def etcd_register_in_path(etcd_client, key, ttl):
|
||||
|
||||
etcd_set(etcd_client, key, time.time(), ttl)
|
||||
|
||||
|
||||
def etcd_deregister_in_path(etcd_client, key, prevValue=False):
|
||||
|
||||
try:
|
||||
if prevValue:
|
||||
etcd_delete(etcd_client, key, recursive=False, dir=False,
|
||||
prevValue=prevValue)
|
||||
else:
|
||||
etcd_delete(etcd_client, key, recursive=True)
|
||||
LOG.warning("Deleted key %s", key)
|
||||
except etcd.EtcdKeyNotFound:
|
||||
LOG.warning("Key %s not exist", key)
|
||||
|
||||
|
||||
# vim: set ts=4 sw=4 tw=0 et :
|
@ -2,17 +2,21 @@
|
||||
|
||||
import argparse
|
||||
import BaseHTTPServer
|
||||
import functools
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import os.path
|
||||
import socket
|
||||
import sys
|
||||
import time
|
||||
|
||||
import etcd
|
||||
import pymysql.cursors
|
||||
|
||||
from entrypoint_utils import retry
|
||||
from entrypoint_utils import get_config
|
||||
from entrypoint_utils import get_etcd_client
|
||||
from entrypoint_utils import get_mysql_client
|
||||
from entrypoint_utils import etcd_delete
|
||||
from entrypoint_utils import etcd_read
|
||||
from entrypoint_utils import etcd_register_in_path
|
||||
|
||||
# Galera states
|
||||
JOINING_STATE = 1
|
||||
@ -38,63 +42,19 @@ IPADDR = socket.gethostbyname(HOSTNAME)
|
||||
MONITOR_PASSWORD = None
|
||||
CLUSTER_NAME = None
|
||||
ETCD_PATH = None
|
||||
ETCD_HOST = None
|
||||
ETCD_PORT = None
|
||||
|
||||
|
||||
def retry(f):
|
||||
@functools.wraps(f)
|
||||
def wrap(*args, **kwargs):
|
||||
attempts = 3
|
||||
delay = 1
|
||||
while attempts > 1:
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
except etcd.EtcdException as e:
|
||||
LOG.warning('Etcd is not ready: %s', str(e))
|
||||
LOG.warning('Retrying in %d seconds...', delay)
|
||||
time.sleep(delay)
|
||||
attempts -= 1
|
||||
except pymysql.OperationalError as e:
|
||||
LOG.warning('Mysql is not ready: %s', str(e))
|
||||
LOG.warning('Retrying in %d seconds...', delay)
|
||||
time.sleep(delay)
|
||||
attempts -= 1
|
||||
return f(*args, **kwargs)
|
||||
return wrap
|
||||
|
||||
|
||||
def get_etcd_client():
|
||||
|
||||
etcd_client = etcd.Client(host=ETCD_HOST,
|
||||
port=ETCD_PORT,
|
||||
allow_reconnect=True,
|
||||
read_timeout=2)
|
||||
return etcd_client
|
||||
|
||||
|
||||
@retry
|
||||
def get_mysql_client():
|
||||
mysql_client = pymysql.connect(host='127.0.0.1',
|
||||
port=33306,
|
||||
user='monitor',
|
||||
password=MONITOR_PASSWORD,
|
||||
connect_timeout=1,
|
||||
read_timeout=1,
|
||||
cursorclass=pymysql.cursors.DictCursor)
|
||||
return mysql_client
|
||||
ETCD_HOSTS = None
|
||||
|
||||
|
||||
class GaleraChecker(object):
|
||||
def __init__(self):
|
||||
self.etcd_client = get_etcd_client()
|
||||
self.etcd_client = get_etcd_client(ETCD_HOSTS)
|
||||
# Liveness check runs every 10 seconds with 5 seconds timeout (default)
|
||||
self.ttl = 20
|
||||
|
||||
@retry
|
||||
def fetch_wsrep_data(self):
|
||||
data = {}
|
||||
mysql_client = get_mysql_client()
|
||||
mysql_client = retry(get_mysql_client, 3)(user='monitor',
|
||||
password=MONITOR_PASSWORD)
|
||||
with mysql_client.cursor() as cursor:
|
||||
sql = "SHOW STATUS LIKE 'wsrep%'"
|
||||
cursor.execute(sql)
|
||||
@ -161,6 +121,7 @@ class GaleraChecker(object):
|
||||
global WAS_JOINED
|
||||
global OLD_STATE
|
||||
wsrep_data = self.fetch_wsrep_data()
|
||||
node_key = os.path.join(ETCD_PATH, 'nodes', IPADDR)
|
||||
|
||||
# If local uuid is different - we have a split brain.
|
||||
cluster_uuid = self.etcd_get_cluster_uuid()
|
||||
@ -178,7 +139,7 @@ class GaleraChecker(object):
|
||||
if state == SYNCED_STATE or state == DONOR_DESYNCED_STATE:
|
||||
WAS_JOINED = True
|
||||
LOG.info("State OK: %s", state_comment)
|
||||
self.etcd_register_in_path('nodes')
|
||||
etcd_register_in_path(self.etcd_client, node_key, self.ttl)
|
||||
return True
|
||||
elif state == JOINED_STATE and WAS_JOINED:
|
||||
# Node was in the JOINED_STATE in prev check too. Seems to it can't
|
||||
@ -186,62 +147,42 @@ class GaleraChecker(object):
|
||||
if OLD_STATE == JOINED_STATE:
|
||||
LOG.error("State BAD: %s", state_comment)
|
||||
LOG.error("Joined, but not syncing")
|
||||
self._etcd_delete()
|
||||
key = os.path.join(ETCD_PATH, 'nodes', IPADDR)
|
||||
etcd_delete(self.etcd_client, key)
|
||||
return False
|
||||
else:
|
||||
LOG.info("State OK: %s", state_comment)
|
||||
LOG.info("Probably will sync soon")
|
||||
self.etcd_register_in_path('nodes')
|
||||
etcd_register_in_path(self.etcd_client, node_key, self.ttl)
|
||||
return False
|
||||
else:
|
||||
LOG.info("State OK: %s", state_comment)
|
||||
LOG.info("Just joined")
|
||||
WAS_JOINED = True
|
||||
self.etcd_register_in_path('nodes')
|
||||
etcd_register_in_path(self.etcd_client, node_key, self.ttl)
|
||||
return True
|
||||
OLD_STATE = state
|
||||
LOG.warning("Unknown state: %s", state_comment)
|
||||
return True
|
||||
|
||||
@retry
|
||||
def _etcd_delete(self):
|
||||
|
||||
key = os.path.join(ETCD_PATH, 'nodes', IPADDR)
|
||||
self.etcd_client.delete(key, recursive=True, dir=True)
|
||||
LOG.warning("Deleted node's key '%s'", key)
|
||||
|
||||
@retry
|
||||
def _etcd_set(self, data):
|
||||
|
||||
self.etcd_client.set(data[0], data[1], self.ttl)
|
||||
LOG.info("Set %s with value '%s'", data[0], data[1])
|
||||
|
||||
@retry
|
||||
def _etcd_read(self, path):
|
||||
|
||||
key = os.path.join(ETCD_PATH, path)
|
||||
return self.etcd_client.read(key).value
|
||||
|
||||
def etcd_register_in_path(self, path):
|
||||
|
||||
key = os.path.join(ETCD_PATH, path, IPADDR)
|
||||
self._etcd_set((key, time.time()))
|
||||
|
||||
def etcd_check_if_cluster_ready(self):
|
||||
|
||||
key = os.path.join(ETCD_PATH, 'state')
|
||||
try:
|
||||
state = self._etcd_read('state')
|
||||
state = etcd_read(self.etcd_client, key)
|
||||
return True if state == 'STEADY' else False
|
||||
except etcd.EtcdKeyNotFound:
|
||||
return False
|
||||
|
||||
def etcd_get_cluster_uuid(self):
|
||||
|
||||
return self._etcd_read('uuid')
|
||||
key = os.path.join(ETCD_PATH, 'uuid')
|
||||
return etcd_read(self.etcd_client, key)
|
||||
|
||||
def check_cluster_state(self):
|
||||
|
||||
state = self._etcd_read('state')
|
||||
key = os.path.join(ETCD_PATH, 'state')
|
||||
state = etcd_read(self.etcd_client, key)
|
||||
if state != 'STEADY':
|
||||
LOG.error("Cluster state is not STEADY")
|
||||
sys.exit(1)
|
||||
@ -281,29 +222,17 @@ def run_readiness():
|
||||
sys.exit(0) if ready else sys.exit(1)
|
||||
|
||||
|
||||
def get_config():
|
||||
def set_globals(config):
|
||||
|
||||
LOG.info("Getting global variables from %s", GLOBALS_PATH)
|
||||
variables = {}
|
||||
with open(GLOBALS_PATH) as f:
|
||||
global_conf = json.load(f)
|
||||
for key in ['percona', 'etcd', 'namespace']:
|
||||
variables[key] = global_conf[key]
|
||||
LOG.debug(variables)
|
||||
return variables
|
||||
|
||||
|
||||
def set_globals():
|
||||
|
||||
config = get_config()
|
||||
global MONITOR_PASSWORD, CLUSTER_NAME
|
||||
global ETCD_PATH, ETCD_HOST, ETCD_PORT
|
||||
global ETCD_PATH, ETCD_HOSTS, ETCD_PORT
|
||||
|
||||
CLUSTER_NAME = config['percona']['cluster_name']
|
||||
MONITOR_PASSWORD = config['percona']['monitor_password']
|
||||
ETCD_PATH = "/galera/%s" % config['percona']['cluster_name']
|
||||
ETCD_HOST = "etcd.%s" % config['namespace']
|
||||
ETCD_PORT = int(config['etcd']['client_port']['cont'])
|
||||
etcd_host = "etcd.%s.svc.cluster.local" % config['namespace']
|
||||
etcd_port = int(config['etcd']['client_port']['cont'])
|
||||
ETCD_HOSTS = ((etcd_host, etcd_port),)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
@ -312,8 +241,8 @@ if __name__ == "__main__":
|
||||
parser.add_argument('type', choices=['liveness', 'readiness'])
|
||||
args = parser.parse_args()
|
||||
|
||||
get_config()
|
||||
set_globals()
|
||||
config = get_config(GLOBALS_PATH, ['percona', 'etcd', 'namespace'])
|
||||
set_globals(config)
|
||||
if args.type == 'liveness':
|
||||
run_liveness()
|
||||
elif args.type == 'readiness':
|
||||
|
@ -1,8 +1,6 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import argparse
|
||||
import functools
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
@ -12,6 +10,12 @@ import time
|
||||
|
||||
import etcd
|
||||
|
||||
from entrypoint_utils import retry
|
||||
from entrypoint_utils import get_config
|
||||
from entrypoint_utils import get_etcd_client
|
||||
from entrypoint_utils import etcd_set
|
||||
from entrypoint_utils import etcd_refresh
|
||||
|
||||
HOSTNAME = socket.getfqdn()
|
||||
IPADDR = socket.gethostbyname(HOSTNAME)
|
||||
BACKEND_NAME = "galera-cluster"
|
||||
@ -27,62 +31,24 @@ LOG.setLevel(logging.DEBUG)
|
||||
CONNECTION_ATTEMPTS = None
|
||||
CONNECTION_DELAY = None
|
||||
ETCD_PATH = None
|
||||
ETCD_HOST = None
|
||||
ETCD_PORT = None
|
||||
ETCD_HOSTS = None
|
||||
|
||||
# Haproxy constant for health checks
|
||||
SRV_STATE_RUNNING = 2
|
||||
SRV_CHK_RES_PASSED = 3
|
||||
|
||||
|
||||
def retry(f):
|
||||
@functools.wraps(f)
|
||||
def wrap(*args, **kwargs):
|
||||
attempts = CONNECTION_ATTEMPTS
|
||||
delay = CONNECTION_DELAY
|
||||
while attempts > 1:
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
except etcd.EtcdException as e:
|
||||
LOG.warning('Etcd is not ready: %s', str(e))
|
||||
LOG.warning('Retrying in %d seconds...', delay)
|
||||
time.sleep(delay)
|
||||
attempts -= 1
|
||||
return f(*args, **kwargs)
|
||||
return wrap
|
||||
def set_globals(config):
|
||||
|
||||
|
||||
def get_config():
|
||||
|
||||
LOG.info("Getting global variables from %s", GLOBALS_PATH)
|
||||
variables = {}
|
||||
with open(GLOBALS_PATH) as f:
|
||||
global_conf = json.load(f)
|
||||
for key in ['percona', 'etcd', 'namespace']:
|
||||
variables[key] = global_conf[key]
|
||||
LOG.debug(variables)
|
||||
return variables
|
||||
|
||||
|
||||
def set_globals():
|
||||
|
||||
config = get_config()
|
||||
global CONNECTION_ATTEMPTS, CONNECTION_DELAY
|
||||
global ETCD_PATH, ETCD_HOST, ETCD_PORT
|
||||
global ETCD_PATH, ETCD_HOSTS
|
||||
|
||||
CONNECTION_ATTEMPTS = config['etcd']['connection_attempts']
|
||||
CONNECTION_DELAY = config['etcd']['connection_delay']
|
||||
ETCD_PATH = "/galera/%s" % config['percona']['cluster_name']
|
||||
ETCD_HOST = "etcd.%s" % config['namespace']
|
||||
ETCD_PORT = int(config['etcd']['client_port']['cont'])
|
||||
|
||||
|
||||
def get_etcd_client():
|
||||
|
||||
return etcd.Client(host=ETCD_HOST,
|
||||
port=ETCD_PORT,
|
||||
allow_reconnect=True,
|
||||
read_timeout=2)
|
||||
etcd_host = "etcd.%s.svc.cluster.local" % config['namespace']
|
||||
etcd_port = int(config['etcd']['client_port']['cont'])
|
||||
ETCD_HOSTS = ((etcd_host, etcd_port),)
|
||||
|
||||
|
||||
def get_socket():
|
||||
@ -109,21 +75,6 @@ def check_haproxy(proc):
|
||||
sys.exit(proc.returncode)
|
||||
|
||||
|
||||
@retry
|
||||
def etcd_set(etcd_client, key, value, ttl, dir=False, append=False, **kwargs):
|
||||
|
||||
etcd_client.write(key, value, ttl, dir, append, **kwargs)
|
||||
LOG.info("Set %s with value '%s'", key, value)
|
||||
|
||||
|
||||
@retry
|
||||
def etcd_refresh(etcd_client, path, ttl):
|
||||
|
||||
key = os.path.join(ETCD_PATH, path)
|
||||
etcd_client.refresh(key, ttl)
|
||||
LOG.info("Refreshed %s ttl. New ttl is '%s'", key, ttl)
|
||||
|
||||
|
||||
def send_command(cmd):
|
||||
|
||||
LOG.debug("Sending '%s' cmd to haproxy", cmd)
|
||||
@ -210,13 +161,14 @@ def get_leader(etcd_client):
|
||||
def set_leader(etcd_client, ttl, **kwargs):
|
||||
|
||||
key = os.path.join(ETCD_PATH, 'leader')
|
||||
etcd_set(etcd_client, key, IPADDR, ttl, **kwargs)
|
||||
retry(etcd_set, CONNECTION_ATTEMPTS)(etcd_client, key, IPADDR, ttl,
|
||||
**kwargs)
|
||||
|
||||
|
||||
def refresh_leader(etcd_client, ttl):
|
||||
|
||||
key = os.path.join(ETCD_PATH, 'leader')
|
||||
etcd_refresh(etcd_client, key, ttl)
|
||||
retry(etcd_refresh, CONNECTION_ATTEMPTS)(etcd_client, key, ttl)
|
||||
|
||||
|
||||
def do_we_need_to_reconfigure_haproxy(leader):
|
||||
@ -233,7 +185,7 @@ def run_daemon(ttl):
|
||||
|
||||
LOG.debug("My IP is: %s", IPADDR)
|
||||
haproxy_proc = run_haproxy()
|
||||
etcd_client = get_etcd_client()
|
||||
etcd_client = get_etcd_client(ETCD_HOSTS)
|
||||
while True:
|
||||
wait_for_cluster_to_be_steady(etcd_client, haproxy_proc)
|
||||
leader = get_leader(etcd_client)
|
||||
@ -253,7 +205,7 @@ def run_daemon(ttl):
|
||||
|
||||
def run_readiness():
|
||||
|
||||
etcd_client = get_etcd_client()
|
||||
etcd_client = get_etcd_client(ETCD_HOSTS)
|
||||
state = get_cluster_state(etcd_client)
|
||||
if state != 'STEADY':
|
||||
LOG.error("Cluster is not in the STEADY state")
|
||||
@ -281,8 +233,8 @@ if __name__ == "__main__":
|
||||
parser.add_argument('type', choices=['daemon', 'readiness'])
|
||||
args = parser.parse_args()
|
||||
|
||||
get_config()
|
||||
set_globals()
|
||||
config = get_config(GLOBALS_PATH, ['percona', 'etcd', 'namespace'])
|
||||
set_globals(config)
|
||||
if args.type == 'daemon':
|
||||
run_daemon(ttl=20)
|
||||
elif args.type == 'readiness':
|
||||
|
@ -1,8 +1,6 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import fileinput
|
||||
import functools
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import os.path
|
||||
@ -15,7 +13,15 @@ import sys
|
||||
import time
|
||||
|
||||
import etcd
|
||||
import pymysql.cursors
|
||||
|
||||
from entrypoint_utils import retry
|
||||
from entrypoint_utils import get_config
|
||||
from entrypoint_utils import get_etcd_client
|
||||
from entrypoint_utils import get_mysql_client
|
||||
from entrypoint_utils import etcd_set
|
||||
from entrypoint_utils import etcd_get
|
||||
from entrypoint_utils import etcd_register_in_path
|
||||
from entrypoint_utils import etcd_deregister_in_path
|
||||
|
||||
HOSTNAME = socket.getfqdn()
|
||||
IPADDR = socket.gethostbyname(HOSTNAME)
|
||||
@ -31,6 +37,7 @@ logging.basicConfig(format=LOG_FORMAT, datefmt=LOG_DATEFMT)
|
||||
LOG = logging.getLogger(__name__)
|
||||
LOG.setLevel(logging.DEBUG)
|
||||
|
||||
MYSQL_SOCK = '/var/run/mysqld/mysqld.sock'
|
||||
FORCE_BOOTSTRAP = None
|
||||
FORCE_BOOTSTRAP_NODE = None
|
||||
EXPECTED_NODES = None
|
||||
@ -41,8 +48,7 @@ MONITOR_PASSWORD = None
|
||||
CONNECTION_ATTEMPTS = None
|
||||
CONNECTION_DELAY = None
|
||||
ETCD_PATH = None
|
||||
ETCD_HOST = None
|
||||
ETCD_PORT = None
|
||||
ETCD_HOSTS = None
|
||||
|
||||
|
||||
class ProcessException(Exception):
|
||||
@ -52,41 +58,11 @@ class ProcessException(Exception):
|
||||
super(ProcessException, self).__init__(self.msg)
|
||||
|
||||
|
||||
def retry(f):
|
||||
@functools.wraps(f)
|
||||
def wrap(*args, **kwargs):
|
||||
attempts = CONNECTION_ATTEMPTS
|
||||
delay = CONNECTION_DELAY
|
||||
while attempts > 1:
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
except etcd.EtcdException as e:
|
||||
LOG.warning('Etcd is not ready: %s', str(e))
|
||||
LOG.warning('Retrying in %d seconds...', delay)
|
||||
time.sleep(delay)
|
||||
attempts -= 1
|
||||
return f(*args, **kwargs)
|
||||
return wrap
|
||||
def set_globals(config):
|
||||
|
||||
|
||||
def get_config():
|
||||
|
||||
LOG.info("Getting global variables from %s", GLOBALS_PATH)
|
||||
variables = {}
|
||||
with open(GLOBALS_PATH) as f:
|
||||
global_conf = json.load(f)
|
||||
for key in ['percona', 'db', 'etcd', 'namespace']:
|
||||
variables[key] = global_conf[key]
|
||||
LOG.debug(variables)
|
||||
return variables
|
||||
|
||||
|
||||
def set_globals():
|
||||
|
||||
config = get_config()
|
||||
global MYSQL_ROOT_PASSWORD, CLUSTER_NAME, XTRABACKUP_PASSWORD
|
||||
global MONITOR_PASSWORD, CONNECTION_ATTEMPTS, CONNECTION_DELAY
|
||||
global ETCD_PATH, ETCD_HOST, ETCD_PORT, EXPECTED_NODES
|
||||
global ETCD_PATH, ETCD_HOSTS, EXPECTED_NODES
|
||||
global FORCE_BOOTSTRAP, FORCE_BOOTSTRAP_NODE
|
||||
|
||||
FORCE_BOOTSTRAP = config['percona']['force_bootstrap']['enabled']
|
||||
@ -99,27 +75,9 @@ def set_globals():
|
||||
CONNECTION_DELAY = config['etcd']['connection_delay']
|
||||
EXPECTED_NODES = int(config['percona']['cluster_size'])
|
||||
ETCD_PATH = "/galera/%s" % config['percona']['cluster_name']
|
||||
ETCD_HOST = "etcd.%s" % config['namespace']
|
||||
ETCD_PORT = int(config['etcd']['client_port']['cont'])
|
||||
|
||||
|
||||
def get_mysql_client(insecure=False):
|
||||
|
||||
password = '' if insecure else MYSQL_ROOT_PASSWORD
|
||||
return pymysql.connect(unix_socket='/var/run/mysqld/mysqld.sock',
|
||||
user='root',
|
||||
password=password,
|
||||
connect_timeout=1,
|
||||
read_timeout=1,
|
||||
cursorclass=pymysql.cursors.DictCursor)
|
||||
|
||||
|
||||
def get_etcd_client():
|
||||
|
||||
return etcd.Client(host=ETCD_HOST,
|
||||
port=ETCD_PORT,
|
||||
allow_reconnect=True,
|
||||
read_timeout=2)
|
||||
etcd_host = "etcd.%s.svc.cluster.local" % config['namespace']
|
||||
etcd_port = int(config['etcd']['client_port']['cont'])
|
||||
ETCD_HOSTS = ((etcd_host, etcd_port),)
|
||||
|
||||
|
||||
def datadir_cleanup(path):
|
||||
@ -169,10 +127,10 @@ def run_mysqld(available_nodes, etcd_client, lock):
|
||||
|
||||
def sig_handler(signum, frame):
|
||||
LOG.info("Caught a signal: %d", signum)
|
||||
etcd_deregister_in_path(etcd_client, 'queue')
|
||||
etcd_deregister_in_path(etcd_client, 'nodes')
|
||||
etcd_deregister_in_path(etcd_client, 'seqno')
|
||||
etcd_deregister_in_path(etcd_client, 'leader', prevValue=IPADDR)
|
||||
for path in ['seqno', 'queue', 'nodes', 'leader']:
|
||||
key = os.path.join(ETCD_PATH, path, IPADDR)
|
||||
prevValue = IPADDR if path == 'leader' else False
|
||||
etcd_deregister_in_path(etcd_client, key, prevValue=prevValue)
|
||||
release_lock(lock)
|
||||
mysqld_proc.send_signal(signum)
|
||||
|
||||
@ -190,12 +148,11 @@ def mysql_exec(mysql_client, sql_list):
|
||||
return cursor.fetchall()
|
||||
|
||||
|
||||
@retry
|
||||
def fetch_status(etcd_client, path):
|
||||
|
||||
key = os.path.join(ETCD_PATH, path)
|
||||
try:
|
||||
root = etcd_client.get(key)
|
||||
root = etcd_get(etcd_client, key)
|
||||
except etcd.EtcdKeyNotFound:
|
||||
LOG.debug("Current nodes in %s is: %s", key, None)
|
||||
return []
|
||||
@ -210,14 +167,14 @@ def fetch_status(etcd_client, path):
|
||||
def fetch_wsrep_data():
|
||||
|
||||
wsrep_data = {}
|
||||
mysql_client = get_mysql_client()
|
||||
mysql_client = retry(get_mysql_client, 3)(unix_socket=MYSQL_SOCK,
|
||||
password=MYSQL_ROOT_PASSWORD)
|
||||
data = mysql_exec(mysql_client, [("SHOW STATUS LIKE 'wsrep%'", None)])
|
||||
for i in data:
|
||||
wsrep_data[i['Variable_name']] = i['Value']
|
||||
return wsrep_data
|
||||
|
||||
|
||||
@retry
|
||||
def get_oldest_node_by_seqno(etcd_client, path):
|
||||
|
||||
"""
|
||||
@ -228,7 +185,7 @@ def get_oldest_node_by_seqno(etcd_client, path):
|
||||
|
||||
"""
|
||||
key = os.path.join(ETCD_PATH, path)
|
||||
root = etcd_client.get(key)
|
||||
root = retry(etcd_get, CONNECTION_ATTEMPTS)(etcd_client, key)
|
||||
# We need to cut etcd path prefix like "/galera/k8scluster/seqno/" to get
|
||||
# the IP addr of the node.
|
||||
prefix = key + "/"
|
||||
@ -240,38 +197,11 @@ def get_oldest_node_by_seqno(etcd_client, path):
|
||||
return result[-1][0]
|
||||
|
||||
|
||||
@retry
|
||||
def _etcd_set(etcd_client, path, value, ttl):
|
||||
|
||||
key = os.path.join(ETCD_PATH, path)
|
||||
etcd_client.set(key, value, ttl=ttl)
|
||||
LOG.info("Set %s with value '%s'", key, value)
|
||||
|
||||
|
||||
def etcd_register_in_path(etcd_client, path, ttl=60):
|
||||
|
||||
key = os.path.join(path, IPADDR)
|
||||
_etcd_set(etcd_client, key, time.time(), ttl)
|
||||
|
||||
|
||||
def etcd_set_seqno(etcd_client, ttl):
|
||||
|
||||
seqno = mysql_get_seqno()
|
||||
key = os.path.join('seqno', IPADDR)
|
||||
_etcd_set(etcd_client, key, seqno, ttl)
|
||||
|
||||
|
||||
def etcd_deregister_in_path(etcd_client, path, prevValue=False):
|
||||
|
||||
key = os.path.join(ETCD_PATH, path, IPADDR)
|
||||
try:
|
||||
if prevValue:
|
||||
etcd_client.delete(key, prevValue=prevValue)
|
||||
else:
|
||||
etcd_client.delete(key, recursive=True)
|
||||
LOG.warning("Deleted key %s", key)
|
||||
except etcd.EtcdKeyNotFound:
|
||||
LOG.warning("Key %s not exist", key)
|
||||
key = os.path.join(ETCD_PATH, 'seqno', IPADDR)
|
||||
retry(etcd_set, CONNECTION_ATTEMPTS)(etcd_client, key, seqno, ttl)
|
||||
|
||||
|
||||
def mysql_get_seqno():
|
||||
@ -434,12 +364,14 @@ def update_uuid(etcd_client):
|
||||
|
||||
wsrep_data = fetch_wsrep_data()
|
||||
uuid = wsrep_data['wsrep_cluster_state_uuid']
|
||||
_etcd_set(etcd_client, 'uuid', uuid, ttl=None)
|
||||
key = os.path.join(ETCD_PATH, 'uuid')
|
||||
retry(etcd_set, CONNECTION_ATTEMPTS)(etcd_client, key, uuid, ttl=None)
|
||||
|
||||
|
||||
def update_cluster_state(etcd_client, state):
|
||||
|
||||
_etcd_set(etcd_client, 'state', state, ttl=None)
|
||||
key = os.path.join(ETCD_PATH, 'state')
|
||||
retry(etcd_set, CONNECTION_ATTEMPTS)(etcd_client, key, state, ttl=None)
|
||||
|
||||
|
||||
def wait_for_mysqld(proc):
|
||||
@ -452,9 +384,11 @@ def wait_for_mysqld(proc):
|
||||
def wait_for_mysqld_to_start(proc, insecure):
|
||||
|
||||
LOG.info("Waiting mysql to start...")
|
||||
for i in range(0, 29):
|
||||
password = '' if insecure else MYSQL_ROOT_PASSWORD
|
||||
for i in range(0, 199):
|
||||
try:
|
||||
mysql_client = get_mysql_client(insecure=insecure)
|
||||
mysql_client = get_mysql_client(unix_socket=MYSQL_SOCK,
|
||||
password=password)
|
||||
mysql_exec(mysql_client, [("SELECT 1", None)])
|
||||
return
|
||||
except Exception:
|
||||
@ -507,7 +441,7 @@ def mysql_init():
|
||||
("DROP DATABASE IF EXISTS test", None),
|
||||
("FLUSH PRIVILEGES", None)]
|
||||
try:
|
||||
mysql_client = get_mysql_client(insecure=True)
|
||||
mysql_client = retry(get_mysql_client, 3, 1)(unix_socket=MYSQL_SOCK)
|
||||
mysql_exec(mysql_client, sql_list)
|
||||
except Exception:
|
||||
raise
|
||||
@ -565,7 +499,8 @@ def run_create_queue(etcd_client, lock, ttl):
|
||||
"""
|
||||
|
||||
LOG.info("Creating recovery queue")
|
||||
etcd_register_in_path(etcd_client, 'queue', ttl=None)
|
||||
key = os.path.join(ETCD_PATH, 'queue', IPADDR)
|
||||
etcd_register_in_path(etcd_client, key, ttl=None)
|
||||
etcd_set_seqno(etcd_client, ttl=None)
|
||||
release_lock(lock)
|
||||
wait_for_expected_state(etcd_client, ttl)
|
||||
@ -598,10 +533,12 @@ def run_join_cluster(etcd_client, lock, ttl):
|
||||
set_safe_to_bootstrap()
|
||||
mysqld = run_mysqld(available_nodes, etcd_client, lock)
|
||||
wait_for_sync(mysqld)
|
||||
etcd_register_in_path(etcd_client, 'nodes', ttl)
|
||||
key = os.path.join(ETCD_PATH, 'nodes', IPADDR)
|
||||
etcd_register_in_path(etcd_client, key, ttl)
|
||||
if state == "RECOVERY":
|
||||
etcd_deregister_in_path(etcd_client, 'seqno')
|
||||
etcd_deregister_in_path(etcd_client, 'queue')
|
||||
for path in ['seqno', 'queue']:
|
||||
key = os.path.join(ETCD_PATH, path, IPADDR)
|
||||
etcd_deregister_in_path(etcd_client, key)
|
||||
last_one = check_if_im_last(etcd_client)
|
||||
release_lock(lock)
|
||||
return (first_one, last_one, mysqld)
|
||||
@ -643,7 +580,7 @@ def main(ttl):
|
||||
|
||||
try:
|
||||
LOG.debug("My IP is: %s", IPADDR)
|
||||
etcd_client = get_etcd_client()
|
||||
etcd_client = retry(get_etcd_client, CONNECTION_ATTEMPTS)(ETCD_HOSTS)
|
||||
lock = etcd.Lock(etcd_client, 'galera')
|
||||
acquire_lock(lock, ttl)
|
||||
check_cluster(etcd_client)
|
||||
@ -678,16 +615,16 @@ def main(ttl):
|
||||
except Exception:
|
||||
raise
|
||||
finally:
|
||||
etcd_deregister_in_path(etcd_client, 'queue')
|
||||
etcd_deregister_in_path(etcd_client, 'nodes')
|
||||
etcd_deregister_in_path(etcd_client, 'seqno')
|
||||
etcd_deregister_in_path(etcd_client, 'leader', prevValue=IPADDR)
|
||||
for path in ['seqno', 'queue', 'nodes', 'leader']:
|
||||
key = os.path.join(ETCD_PATH, path, IPADDR)
|
||||
prevValue = IPADDR if path == 'leader' else False
|
||||
etcd_deregister_in_path(etcd_client, key, prevValue=prevValue)
|
||||
release_lock(lock)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
get_config()
|
||||
set_globals()
|
||||
config = get_config(GLOBALS_PATH, ['percona', 'db', 'etcd', 'namespace'])
|
||||
set_globals(config)
|
||||
main(ttl=300)
|
||||
|
||||
# vim: set ts=4 sw=4 tw=0 et :
|
||||
|
@ -15,6 +15,7 @@ service:
|
||||
daemon:
|
||||
files:
|
||||
- galera-checker
|
||||
- entrypoint_utils
|
||||
dependencies:
|
||||
- etcd
|
||||
command: "/opt/ccp/bin/galera_checker.py liveness"
|
||||
@ -31,6 +32,7 @@ service:
|
||||
files:
|
||||
- haproxy-conf
|
||||
- haproxy_entrypoint
|
||||
- entrypoint_utils
|
||||
dependencies:
|
||||
- etcd
|
||||
command: "/opt/ccp/bin/haproxy_entrypoint.py daemon"
|
||||
@ -63,6 +65,7 @@ service:
|
||||
- entrypoint
|
||||
- mycnf
|
||||
- galera-checker
|
||||
- entrypoint_utils
|
||||
dependencies:
|
||||
- etcd
|
||||
command: /opt/ccp/bin/entrypoint.py
|
||||
@ -86,3 +89,6 @@ files:
|
||||
path: /opt/ccp/bin/haproxy_entrypoint.py
|
||||
content: haproxy_entrypoint.py
|
||||
perm: "0755"
|
||||
entrypoint_utils:
|
||||
path: /opt/ccp/bin/entrypoint_utils.py
|
||||
content: entrypoint_utils.py
|
||||
|
Loading…
Reference in New Issue
Block a user