Handle member removal gracefully
If roles are not settled (voters is not an odd number), leaving the cluster might break things. Waiting for roles to settle before leaving, and afterwards on the leader. Pass force=1 when deleting cluster member. Cluster removal happens when a juju unit is departing. Change-Id: Ic9a7273be36a6cbb117b74bcfd94cef116a3b603
This commit is contained in:
parent
b3b850bd88
commit
abf7310786
@ -29,6 +29,7 @@ from pathlib import (
|
|||||||
import clusterd
|
import clusterd
|
||||||
import ops.framework
|
import ops.framework
|
||||||
import ops_sunbeam.charm as sunbeam_charm
|
import ops_sunbeam.charm as sunbeam_charm
|
||||||
|
import ops_sunbeam.guard as sunbeam_guard
|
||||||
import requests
|
import requests
|
||||||
import tenacity
|
import tenacity
|
||||||
from charms.operator_libs_linux.v2 import (
|
from charms.operator_libs_linux.v2 import (
|
||||||
@ -50,6 +51,10 @@ from relation_handlers import (
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _identity(x: bool) -> bool:
|
||||||
|
return x
|
||||||
|
|
||||||
|
|
||||||
class SunbeamClusterdCharm(sunbeam_charm.OSBaseOperatorCharm):
|
class SunbeamClusterdCharm(sunbeam_charm.OSBaseOperatorCharm):
|
||||||
"""Charm the service."""
|
"""Charm the service."""
|
||||||
|
|
||||||
@ -94,6 +99,9 @@ class SunbeamClusterdCharm(sunbeam_charm.OSBaseOperatorCharm):
|
|||||||
event.defer()
|
event.defer()
|
||||||
return
|
return
|
||||||
self.clusterd_ready()
|
self.clusterd_ready()
|
||||||
|
self.status.set(
|
||||||
|
ops.WaitingStatus("Waiting for clusterd initialization")
|
||||||
|
)
|
||||||
|
|
||||||
def _on_stop(self, event: ops.StopEvent) -> None:
|
def _on_stop(self, event: ops.StopEvent) -> None:
|
||||||
"""Handle stop event."""
|
"""Handle stop event."""
|
||||||
@ -183,9 +191,9 @@ class SunbeamClusterdCharm(sunbeam_charm.OSBaseOperatorCharm):
|
|||||||
def configure_unit(self, event: ops.EventBase):
|
def configure_unit(self, event: ops.EventBase):
|
||||||
"""Configure unit."""
|
"""Configure unit."""
|
||||||
super().configure_unit(event)
|
super().configure_unit(event)
|
||||||
self.ensure_snap_present()
|
|
||||||
if isinstance(event, ClusterdRemoveNodeEvent):
|
if isinstance(event, ClusterdRemoveNodeEvent):
|
||||||
self.remove_node_from_cluster(event)
|
self.remove_node_from_cluster(event)
|
||||||
|
self.ensure_snap_present()
|
||||||
config = self.model.config.get
|
config = self.model.config.get
|
||||||
snap_data = {
|
snap_data = {
|
||||||
"daemon.debug": config("debug", False),
|
"daemon.debug": config("debug", False),
|
||||||
@ -216,7 +224,7 @@ class SunbeamClusterdCharm(sunbeam_charm.OSBaseOperatorCharm):
|
|||||||
stop=tenacity.stop_after_attempt(10),
|
stop=tenacity.stop_after_attempt(10),
|
||||||
retry=(
|
retry=(
|
||||||
tenacity.retry_if_exception_type(clusterd.ClusterdUnavailableError)
|
tenacity.retry_if_exception_type(clusterd.ClusterdUnavailableError)
|
||||||
| tenacity.retry_if_not_result(lambda result: result)
|
| tenacity.retry_if_not_result(_identity)
|
||||||
),
|
),
|
||||||
after=tenacity.after_log(logger, logging.WARNING),
|
after=tenacity.after_log(logger, logging.WARNING),
|
||||||
wait=tenacity.wait_exponential(multiplier=1, min=1, max=30),
|
wait=tenacity.wait_exponential(multiplier=1, min=1, max=30),
|
||||||
@ -234,6 +242,7 @@ class SunbeamClusterdCharm(sunbeam_charm.OSBaseOperatorCharm):
|
|||||||
self.unit.name.replace("/", "-"),
|
self.unit.name.replace("/", "-"),
|
||||||
self._binding_address() + ":" + str(self.clusterd_port),
|
self._binding_address() + ":" + str(self.clusterd_port),
|
||||||
)
|
)
|
||||||
|
self.status.set(ops.ActiveStatus())
|
||||||
|
|
||||||
def add_node_to_cluster(self, event: ClusterdNewNodeEvent) -> None:
|
def add_node_to_cluster(self, event: ClusterdNewNodeEvent) -> None:
|
||||||
"""Generate token for node joining."""
|
"""Generate token for node joining."""
|
||||||
@ -265,46 +274,111 @@ class SunbeamClusterdCharm(sunbeam_charm.OSBaseOperatorCharm):
|
|||||||
return
|
return
|
||||||
|
|
||||||
self_departing = event.departing_unit.name == self.unit.name
|
self_departing = event.departing_unit.name == self.unit.name
|
||||||
departing_key = f"{event.departing_unit.name}.join_token"
|
already_left = self._wait_for_roles_to_settle_before_removal(
|
||||||
unit_name = event.departing_unit.name.replace("/", "-")
|
event, self_departing
|
||||||
|
)
|
||||||
|
self.status.set(ops.ActiveStatus())
|
||||||
|
if already_left:
|
||||||
|
return
|
||||||
|
|
||||||
logger.debug(f"Departing unit: {event.departing_unit.name}")
|
logger.debug(f"Departing unit: {event.departing_unit.name}")
|
||||||
|
self._remove_member_from_cluster(event.departing_unit.name)
|
||||||
|
if self.model.unit.is_leader():
|
||||||
|
departing_key = f"{event.departing_unit.name}.join_token"
|
||||||
|
self.peers.interface._app_data_bag.pop(
|
||||||
|
departing_key,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
if self_departing:
|
||||||
|
self.status.set(ops.WaitingStatus("Waiting for removal"))
|
||||||
|
member_left = self._wait_until_local_member_left_cluster()
|
||||||
|
if member_left:
|
||||||
|
return
|
||||||
|
logger.warning(
|
||||||
|
"Member %s has not left the cluster yet",
|
||||||
|
event.departing_unit.name,
|
||||||
|
)
|
||||||
|
event.defer()
|
||||||
|
raise sunbeam_guard.WaitingExceptionError(
|
||||||
|
"Waiting for member to leave cluster"
|
||||||
|
)
|
||||||
|
self.status.set(ops.WaitingStatus("Waiting for roles to settle"))
|
||||||
|
if not self._wait_until_roles_are_settled():
|
||||||
|
logger.debug("Roles not settled yet")
|
||||||
|
event.defer()
|
||||||
|
raise sunbeam_guard.WaitingExceptionError(
|
||||||
|
"Waiting for roles to settle"
|
||||||
|
)
|
||||||
|
self.status.set(ops.ActiveStatus())
|
||||||
|
|
||||||
|
def _wait_for_roles_to_settle_before_removal(
|
||||||
|
self, event: ops.EventBase, self_departing: bool
|
||||||
|
) -> bool:
|
||||||
|
"""This method waits for rols to settle before removing a member.
|
||||||
|
|
||||||
|
Returns true if the member has already left the cluster.
|
||||||
|
"""
|
||||||
|
if self_departing:
|
||||||
|
# We are the departing unit, and we might be the leader
|
||||||
|
message = "Waiting for roles to settle before leaving cluster"
|
||||||
|
else:
|
||||||
|
# We are the leader, not the departing unit
|
||||||
|
message = "Waiting for roles to settle before removing member"
|
||||||
|
self.status.set(ops.WaitingStatus(message))
|
||||||
|
# Leaving while the roles are not settled can cause the cluster to
|
||||||
|
# be in an inconsistent state. So we wait until the roles are
|
||||||
|
# settled before leaving.
|
||||||
try:
|
try:
|
||||||
logger.debug(f"Removing member {unit_name}")
|
if not self._wait_until_roles_are_settled():
|
||||||
self._clusterd.remove_node(unit_name, allow_not_found=True)
|
logger.debug("Roles not settled yet")
|
||||||
|
event.defer()
|
||||||
|
raise sunbeam_guard.WaitingExceptionError(message)
|
||||||
|
except requests.exceptions.HTTPError as e:
|
||||||
|
if (
|
||||||
|
e.response is not None
|
||||||
|
and "Daemon not yet initialized" in e.response.text
|
||||||
|
):
|
||||||
|
if self_departing:
|
||||||
|
logger.debug("Member already left cluster")
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _remove_member_from_cluster(self, departing_unit: str):
|
||||||
|
"""Helper method to remove a member from the cluster."""
|
||||||
|
member_name = departing_unit.replace("/", "-")
|
||||||
|
self_departing = departing_unit == self.unit.name
|
||||||
|
try:
|
||||||
|
logger.debug(f"Removing member {member_name}")
|
||||||
|
self._clusterd.remove_node(
|
||||||
|
member_name,
|
||||||
|
force=True,
|
||||||
|
allow_not_found=True,
|
||||||
|
)
|
||||||
except clusterd.ClusterdUnavailableError as e:
|
except clusterd.ClusterdUnavailableError as e:
|
||||||
if "Remote end closed connection without response" in str(e):
|
if "Remote end closed connection without response" in str(e):
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Forwarded request failed, most likely because member was leader"
|
"Forwarded request failed, most likely because member was leader"
|
||||||
" and this member was removed."
|
" and this member was removed."
|
||||||
)
|
)
|
||||||
return
|
else:
|
||||||
if self_departing:
|
raise e
|
||||||
logger.debug(
|
|
||||||
"Happened during self removal, ignoring. Error: %s", e
|
|
||||||
)
|
|
||||||
return
|
|
||||||
except requests.exceptions.HTTPError as e:
|
except requests.exceptions.HTTPError as e:
|
||||||
is_503 = e.response is not None and e.response.status_code == 503
|
if e.response is None:
|
||||||
if self_departing and is_503:
|
raise e
|
||||||
logger.debug(
|
is_503 = e.response.status_code == 503
|
||||||
"Clusterd is not initialized, most likely because"
|
is_500 = e.response.status_code == 500
|
||||||
" leader has already removed this unit from clusterd."
|
if not self_departing or not (is_503 or is_500):
|
||||||
" Error: %s",
|
raise e
|
||||||
e.response.text,
|
logger.debug(
|
||||||
)
|
"Clusterd is not initialized, most likely because"
|
||||||
return
|
" leader has already removed this unit from clusterd."
|
||||||
raise e
|
" Error: %s",
|
||||||
finally:
|
e.response.text, # type: ignore
|
||||||
departing_key = f"{event.departing_unit.name}.join_token"
|
)
|
||||||
if self.unit.is_leader():
|
|
||||||
self.peers.interface._app_data_bag.pop(
|
|
||||||
departing_key,
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
|
|
||||||
def join_node_to_cluster(self, event: ClusterdNodeAddedEvent) -> None:
|
def join_node_to_cluster(self, event: ClusterdNodeAddedEvent) -> None:
|
||||||
"""Join node to cluster."""
|
"""Join node to cluster."""
|
||||||
|
self.status.set(ops.MaintenanceStatus("Joining cluster"))
|
||||||
token = self.peers.get_app_data(f"{self.unit.name}.join_token")
|
token = self.peers.get_app_data(f"{self.unit.name}.join_token")
|
||||||
if token is None:
|
if token is None:
|
||||||
logger.warning("No token found for unit %s", self.unit.name)
|
logger.warning("No token found for unit %s", self.unit.name)
|
||||||
@ -327,18 +401,83 @@ class SunbeamClusterdCharm(sunbeam_charm.OSBaseOperatorCharm):
|
|||||||
return
|
return
|
||||||
self.status.set(ops.ActiveStatus())
|
self.status.set(ops.ActiveStatus())
|
||||||
|
|
||||||
@tenacity.retry(
|
|
||||||
wait=tenacity.wait_fixed(5),
|
|
||||||
stop=tenacity.stop_after_delay(300),
|
|
||||||
retry=tenacity.retry_if_not_result(lambda result: result),
|
|
||||||
)
|
|
||||||
def _wait_until_role_set(self, name: str) -> bool:
|
def _wait_until_role_set(self, name: str) -> bool:
|
||||||
member = self._clusterd.get_member(name)
|
@tenacity.retry(
|
||||||
role = member.get("role")
|
wait=tenacity.wait_fixed(5),
|
||||||
logger.debug(f"Member {name} role: {role}")
|
stop=tenacity.stop_after_delay(300),
|
||||||
if role == "PENDING":
|
retry=tenacity.retry_if_not_result(_identity),
|
||||||
|
)
|
||||||
|
def _wait_until_role_set(name: str) -> bool:
|
||||||
|
member = self._clusterd.get_member(name)
|
||||||
|
role = member.get("role")
|
||||||
|
logger.debug(f"Member {name} role: {role}")
|
||||||
|
if role == "PENDING":
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
try:
|
||||||
|
return _wait_until_role_set(name)
|
||||||
|
except tenacity.RetryError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _wait_until_roles_are_settled(self) -> bool:
|
||||||
|
"""Wait until cluster has odd number of voters."""
|
||||||
|
|
||||||
|
@tenacity.retry(
|
||||||
|
wait=tenacity.wait_fixed(5),
|
||||||
|
stop=tenacity.stop_after_delay(60),
|
||||||
|
retry=tenacity.retry_if_not_result(_identity),
|
||||||
|
)
|
||||||
|
def _wait_until_roles_are_settled() -> bool:
|
||||||
|
members = self._clusterd.get_members()
|
||||||
|
voter = 0
|
||||||
|
for member in members:
|
||||||
|
if member.get("role") == "voter":
|
||||||
|
voter += 1
|
||||||
|
if voter % 2 == 0:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
try:
|
||||||
|
return _wait_until_roles_are_settled()
|
||||||
|
|
||||||
|
except tenacity.RetryError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _wait_until_local_member_left_cluster(self) -> bool:
|
||||||
|
"""Wait until local node has left the cluster."""
|
||||||
|
|
||||||
|
@tenacity.retry(
|
||||||
|
wait=tenacity.wait_fixed(5),
|
||||||
|
stop=tenacity.stop_after_delay(60),
|
||||||
|
retry=tenacity.retry_if_not_result(_identity),
|
||||||
|
)
|
||||||
|
def _wait_until_local_member_left_cluster() -> bool:
|
||||||
|
member_name = self.unit.name.replace("/", "-")
|
||||||
|
try:
|
||||||
|
self._clusterd.get_member(member_name)
|
||||||
|
return False
|
||||||
|
except requests.exceptions.HTTPError as e:
|
||||||
|
if e.response is None:
|
||||||
|
raise e
|
||||||
|
db_closed = "database is closed" in e.response.text
|
||||||
|
clusterd_not_initialized = (
|
||||||
|
"Daemon not yet initialized" in e.response.text
|
||||||
|
)
|
||||||
|
if db_closed or clusterd_not_initialized:
|
||||||
|
logger.debug(
|
||||||
|
"Clusterd returned a known error while waiting for removal."
|
||||||
|
". Skipping."
|
||||||
|
" Error: %s",
|
||||||
|
e.response.text,
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
raise e
|
||||||
|
|
||||||
|
try:
|
||||||
|
return _wait_until_local_member_left_cluster()
|
||||||
|
except tenacity.RetryError:
|
||||||
return False
|
return False
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__": # pragma: nocover
|
if __name__ == "__main__": # pragma: nocover
|
||||||
|
@ -135,10 +135,16 @@ class ClusterdClient:
|
|||||||
return member
|
return member
|
||||||
raise ValueError(f"Member {name} not found")
|
raise ValueError(f"Member {name} not found")
|
||||||
|
|
||||||
def remove_node(self, name: str, allow_not_found: bool = True):
|
def remove_node(
|
||||||
|
self,
|
||||||
|
name: str,
|
||||||
|
force: bool = False,
|
||||||
|
allow_not_found: bool = True,
|
||||||
|
):
|
||||||
"""Delete node."""
|
"""Delete node."""
|
||||||
|
int_force = 1 if force else 0
|
||||||
try:
|
try:
|
||||||
self._delete(f"/cluster/1.0/cluster/{name}")
|
self._delete(f"/cluster/1.0/cluster/{name}?force={int_force}")
|
||||||
except requests.exceptions.HTTPError as e:
|
except requests.exceptions.HTTPError as e:
|
||||||
if e.response is None:
|
if e.response is None:
|
||||||
raise e
|
raise e
|
||||||
|
@ -16,7 +16,7 @@
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import subprocess
|
import subprocess
|
||||||
from random import shuffle, choice
|
from random import shuffle
|
||||||
from typing import Tuple
|
from typing import Tuple
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
@ -105,9 +105,9 @@ class ClusterdTest(test_utils.BaseCharmTest):
|
|||||||
|
|
||||||
def test_203_scale_down_to_2_units(self):
|
def test_203_scale_down_to_2_units(self):
|
||||||
"""Scale down to 2 units for voter/spare test."""
|
"""Scale down to 2 units for voter/spare test."""
|
||||||
units = self._get_units()
|
leader = model.get_lead_unit_name(self.application_name)
|
||||||
model.destroy_unit(
|
model.destroy_unit(
|
||||||
self.application_name, choice(units), wait_disappear=True
|
self.application_name, leader, wait_disappear=True
|
||||||
)
|
)
|
||||||
model.block_until_all_units_idle()
|
model.block_until_all_units_idle()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user