openstack-helm-infra/mariadb/templates/bin/_mariadb-wait-for-cluster.py.tpl
Sergiy Markin f630c152e8 Mariadb chart updates
This PS is for improvements for wait_for_cluster mariadb job.

Change-Id: I46de32243e3aaa98b7e3e8c132a84d7b65d657cc
2024-11-01 22:34:58 +00:00

191 lines
5.7 KiB
Smarty

#!/usr/bin/env python3
import datetime
from enum import Enum
import logging
import os
import sys
import time
import pymysql
import pykube
MARIADB_HOST = os.getenv("MARIADB_HOST")
MARIADB_PASSWORD = os.getenv("MARIADB_PASSWORD")
MARIADB_REPLICAS = os.getenv("MARIADB_REPLICAS")
MARIADB_CLUSTER_STATE_LOG_LEVEL = os.getenv("MARIADB_CLUSTER_STATE_LOG_LEVEL", "INFO")
MARIADB_CLUSTER_STABILITY_COUNT = int(
os.getenv("MARIADB_CLUSTER_STABILITY_COUNT", "30")
)
MARIADB_CLUSTER_STABILITY_WAIT = int(os.getenv("MARIADB_CLUSTER_STABILITY_WAIT", "4"))
MARIADB_CLUSTER_CHECK_WAIT = int(os.getenv("MARIADB_CLUSTER_CHECK_WAIT", "30"))
MARIADB_CLUSTER_STATE_CONFIGMAP = os.getenv("MARIADB_CLUSTER_STATE_CONFIGMAP")
MARIADB_CLUSTER_STATE_CONFIGMAP_NAMESPACE = os.getenv(
"MARIADB_CLUSTER_STATE_CONFIGMAP_NAMESPACE", "openstack"
)
MARIADB_CLUSTER_STATE_PYKUBE_REQUEST_TIMEOUT = int(
os.getenv("MARIADB_CLUSTER_STATE_PYKUBE_REQUEST_TIMEOUT", 60)
)
log_level = MARIADB_CLUSTER_STATE_LOG_LEVEL
logging.basicConfig(
stream=sys.stdout,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
LOG = logging.getLogger("mariadb-cluster-wait")
LOG.setLevel(log_level)
def login():
config = pykube.KubeConfig.from_env()
client = pykube.HTTPClient(
config=config, timeout=MARIADB_CLUSTER_STATE_PYKUBE_REQUEST_TIMEOUT
)
LOG.info(f"Created k8s api client from context {config.current_context}")
return client
api = login()
cluster_state_map = (
pykube.ConfigMap.objects(api)
.filter(namespace=MARIADB_CLUSTER_STATE_CONFIGMAP_NAMESPACE)
.get_by_name(MARIADB_CLUSTER_STATE_CONFIGMAP)
)
def get_current_state(cluster_state_map):
cluster_state_map.get(
MARIADB_CLUSTER_STATE_INITIAL_BOOTSTRAP_COMPLETED_KEY, "False"
)
def retry(times, exceptions):
def decorator(func):
def newfn(*args, **kwargs):
attempt = 0
while attempt < times:
try:
return func(*args, **kwargs)
except exceptions:
attempt += 1
LOG.exception(
f"Exception thrown when attempting to run {func}, attempt {attempt} of {times}"
)
return func(*args, **kwargs)
return newfn
return decorator
class initalClusterState:
initial_state_key = "initial-bootstrap-completed.cluster"
@retry(times=100, exceptions=(Exception))
def __init__(self, api, namespace, name):
self.namespace = namespace
self.name = name
self.cm = (
pykube.ConfigMap.objects(api)
.filter(namespace=self.namespace)
.get_by_name(self.name)
)
def get_default(self):
"""We have deployments with completed job, but it is not reflected
in the configmap state. Assume when configmap is created more than
1h and we doing update/restart, and key not in map this is
existed environment. So we assume the cluster was initialy bootstrapped.
This is needed to avoid manual actions.
"""
now = datetime.datetime.utcnow()
created_at = datetime.datetime.strptime(
self.cm.obj["metadata"]["creationTimestamp"], "%Y-%m-%dT%H:%M:%SZ"
)
delta = datetime.timedelta(seconds=3600)
if now - created_at > delta:
self.complete()
return "COMPLETED"
return "NOT_COMPLETED"
@property
@retry(times=10, exceptions=(Exception))
def is_completed(self):
self.cm.reload()
if self.initial_state_key in self.cm.obj["data"]:
return self.cm.obj["data"][self.initial_state_key]
return self.get_default() == "COMPLETED"
@retry(times=100, exceptions=(Exception))
def complete(self):
patch = {"data": {self.initial_state_key: "COMPLETED"}}
self.cm.patch(patch)
ics = initalClusterState(
api, MARIADB_CLUSTER_STATE_CONFIGMAP_NAMESPACE, MARIADB_CLUSTER_STATE_CONFIGMAP
)
if ics.is_completed:
LOG.info("The initial bootstrap was completed, skipping wait...")
sys.exit(0)
LOG.info("Checking for mariadb cluster state.")
def is_mariadb_stabe():
try:
wsrep_OK = {
"wsrep_ready": "ON",
"wsrep_connected": "ON",
"wsrep_cluster_status": "Primary",
"wsrep_local_state_comment": "Synced",
"wsrep_cluster_size": str(MARIADB_REPLICAS),
}
wsrep_vars = ",".join(["'" + var + "'" for var in wsrep_OK.keys()])
db_cursor = pymysql.connect(
host=MARIADB_HOST, password=MARIADB_PASSWORD,
read_default_file="/etc/mysql/admin_user.cnf"
).cursor()
db_cursor.execute(f"SHOW GLOBAL STATUS WHERE Variable_name IN ({wsrep_vars})")
wsrep_vars = db_cursor.fetchall()
diff = set(wsrep_vars).difference(set(wsrep_OK.items()))
if diff:
LOG.error(f"The wsrep is not OK: {diff}")
else:
LOG.info("The wspep is ready")
return True
except Exception as e:
LOG.exception(f"Got exception while checking state. {e}")
return False
count = 0
ready = False
stable_for = 1
while True:
if is_mariadb_stabe():
stable_for += 1
LOG.info(
f"The cluster is stable for {stable_for} out of {MARIADB_CLUSTER_STABILITY_COUNT}"
)
if stable_for == MARIADB_CLUSTER_STABILITY_COUNT:
ics.complete()
sys.exit(0)
else:
LOG.info(f"Sleeping for {MARIADB_CLUSTER_STABILITY_WAIT}")
time.sleep(MARIADB_CLUSTER_STABILITY_WAIT)
continue
else:
LOG.info("Resetting stable_for count.")
stable_for = 0
LOG.info(f"Sleeping for {MARIADB_CLUSTER_CHECK_WAIT}")
time.sleep(MARIADB_CLUSTER_CHECK_WAIT)