diff --git a/charms/sunbeam-clusterd/src/charm.py b/charms/sunbeam-clusterd/src/charm.py index 115028b1..0aa4a234 100755 --- a/charms/sunbeam-clusterd/src/charm.py +++ b/charms/sunbeam-clusterd/src/charm.py @@ -29,6 +29,7 @@ from pathlib import ( import clusterd import ops.framework import ops_sunbeam.charm as sunbeam_charm +import ops_sunbeam.guard as sunbeam_guard import requests import tenacity from charms.operator_libs_linux.v2 import ( @@ -50,6 +51,10 @@ from relation_handlers import ( logger = logging.getLogger(__name__) +def _identity(x: bool) -> bool: + return x + + class SunbeamClusterdCharm(sunbeam_charm.OSBaseOperatorCharm): """Charm the service.""" @@ -94,6 +99,9 @@ class SunbeamClusterdCharm(sunbeam_charm.OSBaseOperatorCharm): event.defer() return self.clusterd_ready() + self.status.set( + ops.WaitingStatus("Waiting for clusterd initialization") + ) def _on_stop(self, event: ops.StopEvent) -> None: """Handle stop event.""" @@ -183,9 +191,9 @@ class SunbeamClusterdCharm(sunbeam_charm.OSBaseOperatorCharm): def configure_unit(self, event: ops.EventBase): """Configure unit.""" super().configure_unit(event) - self.ensure_snap_present() if isinstance(event, ClusterdRemoveNodeEvent): self.remove_node_from_cluster(event) + self.ensure_snap_present() config = self.model.config.get snap_data = { "daemon.debug": config("debug", False), @@ -216,7 +224,7 @@ class SunbeamClusterdCharm(sunbeam_charm.OSBaseOperatorCharm): stop=tenacity.stop_after_attempt(10), retry=( tenacity.retry_if_exception_type(clusterd.ClusterdUnavailableError) - | tenacity.retry_if_not_result(lambda result: result) + | tenacity.retry_if_not_result(_identity) ), after=tenacity.after_log(logger, logging.WARNING), wait=tenacity.wait_exponential(multiplier=1, min=1, max=30), @@ -234,6 +242,7 @@ class SunbeamClusterdCharm(sunbeam_charm.OSBaseOperatorCharm): self.unit.name.replace("/", "-"), self._binding_address() + ":" + str(self.clusterd_port), ) + self.status.set(ops.ActiveStatus()) def add_node_to_cluster(self, event: ClusterdNewNodeEvent) -> None: """Generate token for node joining.""" @@ -265,46 +274,111 @@ class SunbeamClusterdCharm(sunbeam_charm.OSBaseOperatorCharm): return self_departing = event.departing_unit.name == self.unit.name - departing_key = f"{event.departing_unit.name}.join_token" - unit_name = event.departing_unit.name.replace("/", "-") + already_left = self._wait_for_roles_to_settle_before_removal( + event, self_departing + ) + self.status.set(ops.ActiveStatus()) + if already_left: + return logger.debug(f"Departing unit: {event.departing_unit.name}") + self._remove_member_from_cluster(event.departing_unit.name) + if self.model.unit.is_leader(): + departing_key = f"{event.departing_unit.name}.join_token" + self.peers.interface._app_data_bag.pop( + departing_key, + None, + ) + if self_departing: + self.status.set(ops.WaitingStatus("Waiting for removal")) + member_left = self._wait_until_local_member_left_cluster() + if member_left: + return + logger.warning( + "Member %s has not left the cluster yet", + event.departing_unit.name, + ) + event.defer() + raise sunbeam_guard.WaitingExceptionError( + "Waiting for member to leave cluster" + ) + self.status.set(ops.WaitingStatus("Waiting for roles to settle")) + if not self._wait_until_roles_are_settled(): + logger.debug("Roles not settled yet") + event.defer() + raise sunbeam_guard.WaitingExceptionError( + "Waiting for roles to settle" + ) + self.status.set(ops.ActiveStatus()) + + def _wait_for_roles_to_settle_before_removal( + self, event: ops.EventBase, self_departing: bool + ) -> bool: + """This method waits for rols to settle before removing a member. + + Returns true if the member has already left the cluster. + """ + if self_departing: + # We are the departing unit, and we might be the leader + message = "Waiting for roles to settle before leaving cluster" + else: + # We are the leader, not the departing unit + message = "Waiting for roles to settle before removing member" + self.status.set(ops.WaitingStatus(message)) + # Leaving while the roles are not settled can cause the cluster to + # be in an inconsistent state. So we wait until the roles are + # settled before leaving. try: - logger.debug(f"Removing member {unit_name}") - self._clusterd.remove_node(unit_name, allow_not_found=True) + if not self._wait_until_roles_are_settled(): + logger.debug("Roles not settled yet") + event.defer() + raise sunbeam_guard.WaitingExceptionError(message) + except requests.exceptions.HTTPError as e: + if ( + e.response is not None + and "Daemon not yet initialized" in e.response.text + ): + if self_departing: + logger.debug("Member already left cluster") + return True + return False + + def _remove_member_from_cluster(self, departing_unit: str): + """Helper method to remove a member from the cluster.""" + member_name = departing_unit.replace("/", "-") + self_departing = departing_unit == self.unit.name + try: + logger.debug(f"Removing member {member_name}") + self._clusterd.remove_node( + member_name, + force=True, + allow_not_found=True, + ) except clusterd.ClusterdUnavailableError as e: if "Remote end closed connection without response" in str(e): logger.debug( "Forwarded request failed, most likely because member was leader" " and this member was removed." ) - return - if self_departing: - logger.debug( - "Happened during self removal, ignoring. Error: %s", e - ) - return + else: + raise e except requests.exceptions.HTTPError as e: - is_503 = e.response is not None and e.response.status_code == 503 - if self_departing and is_503: - logger.debug( - "Clusterd is not initialized, most likely because" - " leader has already removed this unit from clusterd." - " Error: %s", - e.response.text, - ) - return - raise e - finally: - departing_key = f"{event.departing_unit.name}.join_token" - if self.unit.is_leader(): - self.peers.interface._app_data_bag.pop( - departing_key, - None, - ) + if e.response is None: + raise e + is_503 = e.response.status_code == 503 + is_500 = e.response.status_code == 500 + if not self_departing or not (is_503 or is_500): + raise e + logger.debug( + "Clusterd is not initialized, most likely because" + " leader has already removed this unit from clusterd." + " Error: %s", + e.response.text, # type: ignore + ) def join_node_to_cluster(self, event: ClusterdNodeAddedEvent) -> None: """Join node to cluster.""" + self.status.set(ops.MaintenanceStatus("Joining cluster")) token = self.peers.get_app_data(f"{self.unit.name}.join_token") if token is None: logger.warning("No token found for unit %s", self.unit.name) @@ -327,18 +401,83 @@ class SunbeamClusterdCharm(sunbeam_charm.OSBaseOperatorCharm): return self.status.set(ops.ActiveStatus()) - @tenacity.retry( - wait=tenacity.wait_fixed(5), - stop=tenacity.stop_after_delay(300), - retry=tenacity.retry_if_not_result(lambda result: result), - ) def _wait_until_role_set(self, name: str) -> bool: - member = self._clusterd.get_member(name) - role = member.get("role") - logger.debug(f"Member {name} role: {role}") - if role == "PENDING": + @tenacity.retry( + wait=tenacity.wait_fixed(5), + stop=tenacity.stop_after_delay(300), + retry=tenacity.retry_if_not_result(_identity), + ) + def _wait_until_role_set(name: str) -> bool: + member = self._clusterd.get_member(name) + role = member.get("role") + logger.debug(f"Member {name} role: {role}") + if role == "PENDING": + return False + return True + + try: + return _wait_until_role_set(name) + except tenacity.RetryError: + return False + + def _wait_until_roles_are_settled(self) -> bool: + """Wait until cluster has odd number of voters.""" + + @tenacity.retry( + wait=tenacity.wait_fixed(5), + stop=tenacity.stop_after_delay(60), + retry=tenacity.retry_if_not_result(_identity), + ) + def _wait_until_roles_are_settled() -> bool: + members = self._clusterd.get_members() + voter = 0 + for member in members: + if member.get("role") == "voter": + voter += 1 + if voter % 2 == 0: + return False + return True + + try: + return _wait_until_roles_are_settled() + + except tenacity.RetryError: + return False + + def _wait_until_local_member_left_cluster(self) -> bool: + """Wait until local node has left the cluster.""" + + @tenacity.retry( + wait=tenacity.wait_fixed(5), + stop=tenacity.stop_after_delay(60), + retry=tenacity.retry_if_not_result(_identity), + ) + def _wait_until_local_member_left_cluster() -> bool: + member_name = self.unit.name.replace("/", "-") + try: + self._clusterd.get_member(member_name) + return False + except requests.exceptions.HTTPError as e: + if e.response is None: + raise e + db_closed = "database is closed" in e.response.text + clusterd_not_initialized = ( + "Daemon not yet initialized" in e.response.text + ) + if db_closed or clusterd_not_initialized: + logger.debug( + "Clusterd returned a known error while waiting for removal." + ". Skipping." + " Error: %s", + e.response.text, + ) + return True + raise e + + try: + return _wait_until_local_member_left_cluster() + except tenacity.RetryError: return False - return True if __name__ == "__main__": # pragma: nocover diff --git a/charms/sunbeam-clusterd/src/clusterd.py b/charms/sunbeam-clusterd/src/clusterd.py index 6a733cf6..f0f7a292 100644 --- a/charms/sunbeam-clusterd/src/clusterd.py +++ b/charms/sunbeam-clusterd/src/clusterd.py @@ -135,10 +135,16 @@ class ClusterdClient: return member raise ValueError(f"Member {name} not found") - def remove_node(self, name: str, allow_not_found: bool = True): + def remove_node( + self, + name: str, + force: bool = False, + allow_not_found: bool = True, + ): """Delete node.""" + int_force = 1 if force else 0 try: - self._delete(f"/cluster/1.0/cluster/{name}") + self._delete(f"/cluster/1.0/cluster/{name}?force={int_force}") except requests.exceptions.HTTPError as e: if e.response is None: raise e diff --git a/tests/local/zaza/sunbeam/charm_tests/clusterd/tests.py b/tests/local/zaza/sunbeam/charm_tests/clusterd/tests.py index b7ed2ed2..c294219b 100644 --- a/tests/local/zaza/sunbeam/charm_tests/clusterd/tests.py +++ b/tests/local/zaza/sunbeam/charm_tests/clusterd/tests.py @@ -16,7 +16,7 @@ import json import logging import subprocess -from random import shuffle, choice +from random import shuffle from typing import Tuple import requests @@ -105,9 +105,9 @@ class ClusterdTest(test_utils.BaseCharmTest): def test_203_scale_down_to_2_units(self): """Scale down to 2 units for voter/spare test.""" - units = self._get_units() + leader = model.get_lead_unit_name(self.application_name) model.destroy_unit( - self.application_name, choice(units), wait_disappear=True + self.application_name, leader, wait_disappear=True ) model.block_until_all_units_idle()