Implementation of ovn-central downscaling.

This change includes:
* attempt to leave cluster gracefully when removing unit
* cluster-status action that shows status of SB and NB clusters
* cluster-kick action that allows user to remove cluster members

Associated spec: https://opendev.org/openstack/charm-specs/src/branch/master/specs/yoga/approved/ovn-central-downscaling.rst

Closes-Bug: #1948680
func-test-pr: https://github.com/openstack-charmers/zaza-openstack-tests/pull/933
Change-Id: I40ae08669d00b3b1fa567a45db2ce51425e6d1cb
This commit is contained in:
Martin Kalcok 2022-09-28 22:34:54 +02:00
parent 28d3bc6492
commit 34a82bc763
10 changed files with 1224 additions and 4 deletions

View File

@ -25,3 +25,43 @@ run-deferred-hooks:
show-deferred-events:
descrpition: |
Show the outstanding restarts
cluster-status:
description: |
Show status of an OVN cluster. Action result will contain two keys,
"ovnsb" and "ovnnb", each of these keys will contain yaml structure with data
from "ovn-appctl cluster/status" command representing status of Southbound and
Northbound clusters. Additional "unit_map" key is included in each cluster status
that pairs server IDs of cluster members with unit IDs on which these servers run.
In case the action finds servers in cluster that are not associated with any known
unit, the "unit_map" will also include key "UNKNOWN" with list of these
disassociated servers.
cluster-kick:
description: |
Request removal of a server from the cluster. This action is equivalent to running
"ovn-appctl cluster/kick" command and can be run on any unit connected to the
cluster. This action takes ID of a server in southbound or northbound cluster
(or both) as an argument. At least one of these arguments must be specified. To get
the list of servers (and their IDs) connected to the cluster, user can run
"cluster-status" action.
params:
sb-server-id:
type:
- string
- number
default: ""
description: |
ID of a server to kick from Southbound cluster
nb-server-id:
type:
- string
- number
default: ""
description: |
ID of a server to kick from Northbound cluster
i-really-mean-it:
type: boolean
description: |
Confirmation by user to really perform this destructive action
required:
- i-really-mean-it

1
src/actions/cluster-kick Symbolic link
View File

@ -0,0 +1 @@
cluster.py

1
src/actions/cluster-status Symbolic link
View File

@ -0,0 +1 @@
cluster.py

249
src/actions/cluster.py Executable file
View File

@ -0,0 +1,249 @@
#!/usr/bin/env python3
# Copyright 2022 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import yaml
import subprocess
# Load modules from $CHARM_DIR/lib
sys.path.append("lib")
from charms.layer import basic
basic.bootstrap_charm_deps()
import charms_openstack.bus
import charms_openstack.charm
import charms.reactive as reactive
import charmhelpers.core as ch_core
import charmhelpers.contrib.network.ovs.ovn as ch_ovn
import charmhelpers.contrib.network.ip as ch_ip
charms_openstack.bus.discover()
class StatusParsingException(Exception):
"""Exception when OVN cluster status has unexpected format/values."""
def _url_to_ip(cluster_url):
"""Parse IP from cluster URL.
OVN cluster uses urls like "ssl:10.0.0.1:6644". This function parses the
IP portion out of the url. This function works with IPv4 and IPv6
addresses.
:raises StatusParsingException: If cluster_url does not contain valid IP
address.
:param cluster_url: OVN server url. Like "ssl:10.0.0.1".
:type cluster_url: str
:return: Parsed out IP address
:rtype: str
"""
ip_portion = cluster_url.split(":")[1:-1]
if len(ip_portion) > 1:
# Possible IPv6 address
ip_str = ":".join(ip_portion)
else:
# Likely a IPv4 address
ip_str = "".join(ip_portion)
if not ch_ip.is_ip(ip_str):
raise StatusParsingException(
"Failed to parse OVN cluster status. Cluster member address "
"has unexpected format: {}".format(cluster_url)
)
return ip_str
def _format_cluster_status(raw_cluster_status, cluster_ip_map):
"""Reformat cluster status into dict.
Resulting dictionary also includes mapping between cluster servers and
juju units.
Parameter cluster_ip_map is a dictionary with juju unit IDs as a key and
their respective IP addresses as a value. Example:
{"ovn-central/0": "10.0.0.1", "ovn-central/1: "10.0.0.2"}
:raises StatusParsingException: In case the parsing of a cluster status
fails.
:param raw_cluster_status: Cluster status object
:type raw_cluster_status: ch_ovn.OVNClusterStatus
:param cluster_ip_map: mapping between juju units and their IPs in the
cluster.
:type cluster_ip_map: dict
:return: Cluster status in the form of dictionary
:rtype: dict
"""
mapped_servers = {}
unknown_servers = []
# Map unit name to each server in the Servers field.
for server_id, server_url in raw_cluster_status.servers:
member_address = _url_to_ip(server_url)
for unit, ip in cluster_ip_map.items():
if member_address == ip:
mapped_servers[unit] = server_id
break
else:
unknown_servers.append(server_id)
cluster = raw_cluster_status.to_yaml()
if unknown_servers:
mapped_servers["UNKNOWN"] = unknown_servers
cluster["unit_map"] = mapped_servers
return cluster
def _cluster_ip_map():
"""Produce mapping between units and their IPs.
This function selects an IP bound to the ovsdb-peer endpoint.
Example output: {"ovn-central/0": "10.0.0.1", ...}
"""
# Existence of ovsdb-peer relation is guaranteed by check in the main func
ovsdb_peers = reactive.endpoint_from_flag("ovsdb-peer.available")
local_unit_id = ch_core.hookenv.local_unit()
local_ip = ovsdb_peers.cluster_local_addr
unit_map = {local_unit_id: local_ip}
for relation in ovsdb_peers.relations:
for unit in relation.units:
try:
address = unit.received.get("bound-address", "")
unit_map[unit.unit_name] = address
except ValueError:
pass
return unit_map
def _kick_server(cluster, server_id):
"""Perform ovn-appctl cluster/kick to remove server from selected cluster.
:raises:
subprocess.CalledProcessError: If subprocess command execution fails.
ValueError: If cluster parameter doesn't have an expected value.
:param cluster: Cluster from which the server should be kicked. Available
options are "northbound" or "southbound"
:type cluster: str
:param server_id: short ID of a server to be kicked
:type server_id: str
:return: None
"""
if cluster.lower() == "southbound":
params = ("ovnsb_db", ("cluster/kick", "OVN_Southbound", server_id))
elif cluster.lower() == "northbound":
params = ("ovnnb_db", ("cluster/kick", "OVN_Northbound", server_id))
else:
raise ValueError(
"Unexpected value of 'cluster' parameter: '{}'".format(cluster)
)
ch_ovn.ovn_appctl(*params)
def cluster_status():
"""Implementation of a "cluster-status" action."""
with charms_openstack.charm.provide_charm_instance() as charm_instance:
sb_status = charm_instance.cluster_status("ovnsb_db")
nb_status = charm_instance.cluster_status("ovnnb_db")
try:
unit_ip_map = _cluster_ip_map()
sb_cluster = _format_cluster_status(sb_status, unit_ip_map)
nb_cluster = _format_cluster_status(nb_status, unit_ip_map)
except StatusParsingException as exc:
ch_core.hookenv.action_fail(str(exc))
return
ch_core.hookenv.action_set(
{"ovnsb": yaml.safe_dump(sb_cluster, sort_keys=False)}
)
ch_core.hookenv.action_set(
{"ovnnb": yaml.safe_dump(nb_cluster, sort_keys=False)}
)
def cluster_kick():
"""Implementation of a "cluster-kick" action."""
sb_server_id = str(ch_core.hookenv.action_get("sb-server-id"))
nb_server_id = str(ch_core.hookenv.action_get("nb-server-id"))
if not (sb_server_id or nb_server_id):
ch_core.hookenv.action_fail(
"At least one server ID to kick must be specified."
)
return
if sb_server_id:
try:
_kick_server("southbound", sb_server_id)
ch_core.hookenv.action_set(
{"ovnsb": "requested kick of {}".format(sb_server_id)}
)
except subprocess.CalledProcessError as exc:
ch_core.hookenv.action_fail(
"Failed to kick Southbound cluster member "
"{}: {}".format(sb_server_id, exc.output)
)
if nb_server_id:
try:
_kick_server("northbound", nb_server_id)
ch_core.hookenv.action_set(
{"ovnnb": "requested kick of {}".format(nb_server_id)}
)
except subprocess.CalledProcessError as exc:
ch_core.hookenv.action_fail(
"Failed to kick Northbound cluster member "
"{}: {}".format(nb_server_id, exc.output)
)
ACTIONS = {"cluster-status": cluster_status, "cluster-kick": cluster_kick}
def main(args):
ch_core.hookenv._run_atstart()
# Abort action if this unit is not in a cluster.
if reactive.endpoint_from_flag("ovsdb-peer.available") is None:
ch_core.hookenv.action_fail("Unit is not part of an OVN cluster.")
return
action_name = os.path.basename(args[0])
try:
action = ACTIONS[action_name]
except KeyError:
return "Action %s undefined" % action_name
else:
try:
action()
except Exception as e:
ch_core.hookenv.action_fail(str(e))
ch_core.hookenv._run_atexit()
if __name__ == "__main__":
sys.exit(main(sys.argv))

View File

@ -816,6 +816,107 @@ class BaseOVNCentralCharm(charms_openstack.charm.OpenStackCharm):
snap.install('prometheus-ovn-exporter', channel=channel,
devmode=True)
@staticmethod
def leave_cluster():
"""Run commands to remove servers running on this unit from cluster.
In case the commands fail, an ERROR message will be logged.
:return: None
:rtype: None
"""
try:
ch_core.hookenv.log(
"Removing self from Southbound cluster",
ch_core.hookenv.INFO
)
ch_ovn.ovn_appctl("ovnsb_db", ("cluster/leave", "OVN_Southbound"))
except subprocess.CalledProcessError:
ch_core.hookenv.log(
"Failed to leave Southbound cluster. You can use "
"'cluster-kick' juju action on remaining units to "
"remove lingering cluster members.",
ch_core.hookenv.ERROR
)
try:
ch_core.hookenv.log(
"Removing self from Northbound cluster",
ch_core.hookenv.INFO
)
ch_ovn.ovn_appctl("ovnnb_db", ("cluster/leave", "OVN_Northbound"))
except subprocess.CalledProcessError:
ch_core.hookenv.log(
"Failed to leave Northbound cluster. You can use "
"'cluster-kick' juju action on remaining units to "
"remove lingering cluster members.",
ch_core.hookenv.ERROR
)
@staticmethod
def is_server_in_cluster(server_ip, cluster_status):
"""Parse cluster status and find if server with given IP is part of it.
:param server_ip: IP of a server to search.
:type server_ip: str
:param cluster_status: Cluster status to parse.
:type cluster_status: ch_ovn.OVNClusterStatus
:return: True if server is part of the cluster. Otherwise, False.
:rtype: bool
"""
remote_unit_url = "ssl:{}:".format(server_ip)
return any(
list(server)[1].startswith(remote_unit_url)
for server in cluster_status.servers
)
def wait_for_server_leave(self, server_ip, timeout=30):
"""Wait for servers with specified IP to leave SB and NB clusters.
:param server_ip: IP of the server that should no longer be part of
the clusters.
:type server_ip: str
:param timeout: How many seconds should this function wait for the
servers to leave. The timeout should be an increment of 5.
:return: True if servers from selected unit departed within the
timeout window. Otherwise, it returns False.
:rtype: bool
"""
tick = 5
timer = 0
unit_in_sb_cluster = unit_in_nb_cluster = True
servers_left = False
wait_sb_msg = "Waiting for {} to leave Southbound cluster".format(
server_ip
)
wait_nb_msg = "Waiting for {} to leave Northbound cluster".format(
server_ip
)
while timer < timeout:
if unit_in_sb_cluster:
ch_core.hookenv.log(wait_sb_msg, ch_core.hookenv.INFO)
unit_in_sb_cluster = self.is_server_in_cluster(
server_ip,
self.cluster_status("ovnsb_db")
)
if unit_in_nb_cluster:
ch_core.hookenv.log(wait_nb_msg, ch_core.hookenv.INFO)
unit_in_nb_cluster = self.is_server_in_cluster(
server_ip,
self.cluster_status("ovnnb_db")
)
if not unit_in_sb_cluster and not unit_in_nb_cluster:
servers_left = True
ch_core.hookenv.log(
"{} servers left Northbound and Southbound OVN "
"clusters.".format(server_ip),
ch_core.hookenv.INFO
)
break
time.sleep(tick)
timer += tick
return servers_left
class TrainOVNCentralCharm(BaseOVNCentralCharm):
# OpenvSwitch and OVN is distributed as part of the Ubuntu Cloud Archive

View File

@ -19,6 +19,8 @@ import charms.coordinator as coordinator
import charms_openstack.bus
import charms_openstack.charm as charm
from charmhelpers.core import hookenv
charms_openstack.bus.discover()
@ -155,7 +157,7 @@ def enable_default_certificates():
charm.use_defaults('certificates.available')
@reactive.when_none('is-update-status-hook')
@reactive.when_none('is-update-status-hook', 'endpoint.ovsdb-peer.departed')
@reactive.when('ovsdb-peer.available')
def configure_firewall():
ovsdb_peer = reactive.endpoint_from_flag('ovsdb-peer.available')
@ -220,7 +222,8 @@ def maybe_do_upgrade():
'coordinator.granted.upgrade',
'coordinator.requested.upgrade',
'config.changed.source',
'config.changed.ovn-source')
'config.changed.ovn-source',
'endpoint.ovsdb-peer.departed')
@reactive.when('ovsdb-peer.available',
'leadership.set.nb_cid',
'leadership.set.sb_cid',
@ -322,3 +325,54 @@ def maybe_clear_metrics_endpoint():
return
metrics_endpoint.clear_job(job_name)
@reactive.when('endpoint.ovsdb-peer.departed')
def handle_cluster_downscale():
"""Handle OVN cluster's downscaling when unit is removed.
There are two branches of code in this function. If it's executed on a
unit that is being removed, It should trigger "cluster/leave" message.
If, on the other hand, this code is executed on a unit that's remaining,
it should wait before the departing unit can send out the "cluster/leave"
command before reconfiguring firewall and closing off ports.
"""
if reactive.is_flag_set("ovsdb-peer.left_cluster"):
# Departing unit already left cluster
hookenv.log("Servers already left the cluster.", hookenv.INFO)
return
departing_unit = hookenv.departing_unit()
is_departing_unit = hookenv.local_unit() == departing_unit
if is_departing_unit:
# Departing unit must attempt to gracefully leave OVN cluster.
with charm.provide_charm_instance() as ovn:
ovn.leave_cluster()
reactive.set_flag("ovsdb-peer.left_cluster")
else:
# unit that remains in cluster should wait for departing unit to
# gracefully leave cluster before reconfiguring firewall
peers = reactive.endpoint_from_name("ovsdb-peer")
remote_unit_ip = peers.all_departed_units[
departing_unit
].received["bound-address"]
with charm.provide_charm_instance() as ovn:
departed = ovn.wait_for_server_leave(remote_unit_ip)
if departed:
hookenv.log(
"Departing unit {} successfully disconnected from "
"cluster.".format(departing_unit),
hookenv.INFO
)
else:
hookenv.log(
"Departing unit {} failed to remove itself from cluster. "
"Please use action `cluster-kick` to remove straggling "
"servers from OVN cluster.".format(departing_unit),
hookenv.WARNING
)
configure_firewall()

View File

@ -30,6 +30,7 @@ configure:
tests:
- zaza.openstack.charm_tests.ovn.tests.OVNCentralDeferredRestartTest
- zaza.openstack.charm_tests.ovn.tests.CentralCharmOperationTest
- zaza.openstack.charm_tests.ovn.tests.OVNCentralDownscaleTests
tests_options:
force_deploy:

View File

@ -0,0 +1,550 @@
# Copyright 2022 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from copy import deepcopy
from unittest import TestCase
from unittest.mock import MagicMock, patch, call
import yaml
import actions.cluster as cluster_actions
class ClusterActionTests(TestCase):
UNIT_MAPPING = {
"ovn-central/0": {"id": "aa11", "address": "ssl:10.0.0.1:6644"},
"ovn-central/1": {"id": "bb22", "address": "ssl:10.0.0.2:6644"},
"ovn-central/2": {"id": "cc33", "address": "ssl:10.0.0.3:6644"},
}
@property
def servers(self):
"""Return list of tuples representing servers in cluster.
This property uses data from self.UNIT_MAPPING to produce output
similar to that of OVNClusterStatus.servers attribute.
:rtype: List[Tuple(str, str)]
"""
servers = []
for server in self.UNIT_MAPPING.values():
servers.append((server["id"], server["address"]))
return servers
@property
def unit_ip_map(self):
"""Return mapping between unit names and their IPs.
This property uses data from self.UNIT_MAPPING.
:rtype: Dict[str, str]
"""
unit_map = {}
for unit, data in self.UNIT_MAPPING.items():
unit_map[unit] = data["address"].split(":")[1]
return unit_map
@property
def unit_id_map(self):
"""Return mapping between unit names and their IDs.
This property uses data from self.UNIT_MAPPING.
:rtype: Dict[str, str]
"""
unit_map = {}
for unit, data in self.UNIT_MAPPING.items():
unit_map[unit] = data["id"]
return unit_map
def setUp(self):
"""Setup and clean up frequent mocks."""
super().setUp()
mocks = [
patch.object(cluster_actions.ch_core.hookenv, "action_get"),
patch.object(cluster_actions.ch_core.hookenv, "action_set"),
patch.object(cluster_actions.ch_core.hookenv, "action_fail"),
patch.object(cluster_actions.ch_ovn, "ovn_appctl"),
]
for mock in mocks:
mock.start()
self.addCleanup(mock.stop)
# Mock actions mapped in the cluster.py otherwise they'd refer
# to non-mocked functions.
self.mapped_action_cluster_kick = MagicMock()
self.mapped_action_cluster_status = MagicMock()
cluster_actions.ACTIONS[
"cluster-kick"
] = self.mapped_action_cluster_kick
cluster_actions.ACTIONS[
"cluster-status"
] = self.mapped_action_cluster_status
def test_url_to_ip(self):
"""Test function that parses IPs out of server URLs."""
valid_ipv4 = "10.0.0.1"
valid_ipv6 = "2001:db8:3333:4444:5555:6666:7777:8888"
invalid_addr = "foo"
url = "ssl:{}:6644"
# Parse valid IPv4
ipv4 = cluster_actions._url_to_ip(url.format(valid_ipv4))
self.assertEquals(ipv4, valid_ipv4)
# Parse valid IPv6
ipv6 = cluster_actions._url_to_ip(url.format(valid_ipv6))
self.assertEquals(ipv6, valid_ipv6)
# Parse invalid url
cluster_actions.ch_ip.is_ip.return_value = False
with self.assertRaises(cluster_actions.StatusParsingException):
cluster_actions._url_to_ip(url.format(invalid_addr))
@patch.object(cluster_actions.ch_ovn, 'OVNClusterStatus')
def test_format_cluster_status(self, mock_cluster_status):
"""Test turning OVNClusterStatus into dict.
Resulting dict also contains additional info mapping cluster servers
to the juju units.
"""
sample_data = {"cluster_id": "11aa", "servers": self.servers}
mock_cluster_status.to_yaml.return_value = sample_data
mock_cluster_status.servers = self.servers
cluster_status = cluster_actions._format_cluster_status(
mock_cluster_status, self.unit_ip_map
)
# Compare resulting dict with expected data
expected_status = sample_data.copy()
expected_status["unit_map"] = self.unit_id_map
self.assertEquals(cluster_status, expected_status)
@patch.object(cluster_actions.ch_ovn, 'OVNClusterStatus')
def test_format_cluster_status_missing_server(self, mock_cluster_status):
"""Test turning OVNClusterStatus into dict with a missing server.
This use-case happens when OVN cluster reports server that does not run
on active ovn-central unit. For example, if server ran on unit that was
destroyed and did not leave cluster gracefully. in such case, resulting
status shows "Unit" attribute of this server as "UNKNOWN"
"""
missing_server_id = "ff99"
missing_server_ip = "10.0.0.99"
missing_server_url = "ssl:{}:6644".format(missing_server_ip)
servers = self.servers.copy()
servers.append((missing_server_id, missing_server_url))
sample_data = {"cluster_id": "11aa", "servers": servers}
mock_cluster_status.to_yaml.return_value = sample_data
mock_cluster_status.servers = servers
cluster_status = cluster_actions._format_cluster_status(
mock_cluster_status, self.unit_ip_map
)
# Compare resulting dict with expected data
expected_status = sample_data.copy()
expected_status["unit_map"] = self.unit_id_map
expected_status["unit_map"]["UNKNOWN"] = [missing_server_id]
self.assertEquals(cluster_status, expected_status)
@patch.object(cluster_actions.ch_ovn, 'OVNClusterStatus')
@patch.object(cluster_actions, "_url_to_ip")
def test_format_cluster_parsing_failure(
self,
mock_url_to_ip,
mock_cluster_status
):
"""Test failure to parse status with format_cluster_status()."""
sample_data = {"cluster_id": "11aa", "servers": self.servers}
mock_cluster_status.to_yaml.return_value = sample_data
mock_cluster_status.servers = self.servers
mock_url_to_ip.side_effect = cluster_actions.StatusParsingException
with self.assertRaises(cluster_actions.StatusParsingException):
cluster_actions._format_cluster_status(
mock_cluster_status, self.unit_ip_map
)
@patch.object(cluster_actions.reactive, "endpoint_from_flag")
@patch.object(cluster_actions.ch_core.hookenv, "local_unit")
def test_cluster_ip_map(self, mock_local_unit, mock_endpoint_from_flag):
"""Test generating map of unit IDs and their IPs."""
expected_map = {}
remote_unit_data = deepcopy(self.UNIT_MAPPING)
remote_units = []
local_unit_name = "ovn-central/0"
local_unit_data = remote_unit_data.pop(local_unit_name)
for unit_name, data in remote_unit_data.items():
_, ip, _ = data["address"].split(":")
unit = MagicMock()
unit.unit_name = unit_name
unit.received = {"bound-address": ip}
remote_units.append(unit)
expected_map[unit_name] = ip
_, local_unit_ip, _ = local_unit_data["address"].split(":")
expected_map[local_unit_name] = local_unit_ip
endpoint = MagicMock()
relation = MagicMock()
relation.units = remote_units
endpoint.relations = [relation]
endpoint.cluster_local_addr = local_unit_ip
mock_local_unit.return_value = local_unit_name
mock_endpoint_from_flag.return_value = endpoint
unit_mapping = cluster_actions._cluster_ip_map()
self.assertEquals(unit_mapping, expected_map)
def test_kick_server_success(self):
"""Test successfully kicking server from cluster"""
server_id = "aa11"
expected_sb_call = (
"ovnsb_db",
("cluster/kick", "OVN_Southbound", server_id)
)
expected_nb_call = (
"ovnnb_db",
("cluster/kick", "OVN_Northbound", server_id)
)
# test kick from Southbound cluster
cluster_actions._kick_server("southbound", server_id)
cluster_actions.ch_ovn.ovn_appctl.assert_called_once_with(
*expected_sb_call
)
# Reset mock
cluster_actions.ch_ovn.ovn_appctl.reset_mock()
# test kick from Northbound cluster
cluster_actions._kick_server("northbound", server_id)
cluster_actions.ch_ovn.ovn_appctl.assert_called_once_with(
*expected_nb_call
)
def test_kick_server_unknown_cluster(self):
"""Test failure when kicking server from unknown cluster.
Function _kick_server() expects either "southbound" or "northbound" as
value of 'cluster' parameter. Other values should raise ValueError.
"""
with self.assertRaises(ValueError):
cluster_actions._kick_server("foo", "11aa")
@patch.object(
cluster_actions.charms_openstack.charm, "provide_charm_instance"
)
@patch.object(cluster_actions, "_cluster_ip_map")
@patch.object(cluster_actions, "_format_cluster_status")
def test_cluster_status(
self, format_cluster_mock, cluster_map_mock, provide_instance_mock
):
"""Test cluster-status action implementation."""
sb_raw_status = "Southbound status"
nb_raw_status = "Northbound status"
charm_instance = MagicMock()
charm_instance.cluster_status.side_effect = [
sb_raw_status,
nb_raw_status,
]
provide_instance_mock.return_value = charm_instance
ip_map = {"ovn-central/0": "10.0.0.0"}
cluster_map_mock.return_value = ip_map
sb_cluster_status = {"Southbound": "status"}
nb_cluster_status = {"Northbound": "status"}
format_cluster_mock.side_effect = [
sb_cluster_status,
nb_cluster_status,
]
# Test successfully generating cluster status
cluster_actions.cluster_status()
expected_calls = [
call(
{
"ovnsb": yaml.safe_dump(
sb_cluster_status, sort_keys=False
)
}
),
call(
{
"ovnnb": yaml.dump(
nb_cluster_status, sort_keys=False
)
}
),
]
cluster_actions.ch_core.hookenv.action_set.has_calls(expected_calls)
cluster_actions.ch_core.hookenv.action_fail.asser_not_called()
# Reset mocks
cluster_actions.ch_core.hookenv.action_set.reset_mock()
# Test failure to generate cluster status
msg = "parsing failed"
format_cluster_mock.side_effect = (
cluster_actions.StatusParsingException(msg)
)
cluster_actions.cluster_status()
cluster_actions.ch_core.hookenv.action_set.assert_not_called()
cluster_actions.ch_core.hookenv.action_fail.assert_called_once_with(
msg
)
@patch.object(cluster_actions, "_kick_server")
def test_cluster_kick_no_server(self, kick_server_mock):
"""Test running cluster-kick action without providing any server ID."""
cluster_actions.ch_core.hookenv.action_get.return_value = ""
err = "At least one server ID to kick must be specified."
cluster_actions.cluster_kick()
cluster_actions.ch_core.hookenv.action_fail.assert_called_once_with(
err
)
cluster_actions.ch_core.hookenv.action_set.assert_not_called()
kick_server_mock.assert_not_called()
@patch.object(cluster_actions, "_kick_server")
def test_cluster_kick_sb_server(self, kick_server_mock):
"""Test kicking single Southbound server from cluster."""
sb_id = "11aa"
nb_id = ""
expected_msg = {"ovnsb": "requested kick of {}".format(sb_id)}
# Test successfully kicking server from Southbound cluster
cluster_actions.ch_core.hookenv.action_get.side_effect = [
sb_id,
nb_id,
]
cluster_actions.cluster_kick()
cluster_actions.ch_core.hookenv.action_fail.assert_not_called()
cluster_actions.ch_core.hookenv.action_set.assert_called_once_with(
expected_msg
)
kick_server_mock.assert_called_once_with("southbound", sb_id)
# Reset mocks
cluster_actions.ch_core.hookenv.action_set.reset_mock()
cluster_actions.ch_core.hookenv.action_fail.reset_mock()
kick_server_mock.reset_mock()
cluster_actions.ch_core.hookenv.action_get.side_effect = [
sb_id,
nb_id,
]
# Test failure to kick server from Southbound cluster
process_output = "Failed to kick server"
exception = cluster_actions.subprocess.CalledProcessError(
-1, "/usr/sbin/ovs-appctl", process_output
)
kick_server_mock.side_effect = exception
err = "Failed to kick Southbound cluster member {}: {}".format(
sb_id, process_output
)
cluster_actions.cluster_kick()
cluster_actions.ch_core.hookenv.action_set.assert_not_called()
cluster_actions.ch_core.hookenv.action_fail.assert_called_once_with(
err
)
kick_server_mock.assert_called_once_with("southbound", sb_id)
@patch.object(cluster_actions, "_kick_server")
def test_cluster_kick_nb_server(self, kick_server_mock):
"""Test kicking single Northbound server from cluster."""
sb_id = ""
nb_id = "22bb"
expected_msg = {"ovnnb": "requested kick of {}".format(nb_id)}
# Test successfully kicking server from Northbound cluster
cluster_actions.ch_core.hookenv.action_get.side_effect = [
sb_id,
nb_id,
]
cluster_actions.cluster_kick()
cluster_actions.ch_core.hookenv.action_fail.assert_not_called()
cluster_actions.ch_core.hookenv.action_set.assert_called_once_with(
expected_msg
)
kick_server_mock.assert_called_once_with("northbound", nb_id)
# Reset mocks
cluster_actions.ch_core.hookenv.action_set.reset_mock()
cluster_actions.ch_core.hookenv.action_fail.reset_mock()
kick_server_mock.reset_mock()
cluster_actions.ch_core.hookenv.action_get.side_effect = [
sb_id,
nb_id,
]
# Test failure to kick server from Northbound cluster
process_output = "Failed to kick server"
exception = cluster_actions.subprocess.CalledProcessError(
-1, "/usr/sbin/ovs-appctl", process_output
)
kick_server_mock.side_effect = exception
err = "Failed to kick Northbound cluster member {}: {}".format(
nb_id, process_output
)
cluster_actions.cluster_kick()
cluster_actions.ch_core.hookenv.action_set.assert_not_called()
cluster_actions.ch_core.hookenv.action_fail.assert_called_once_with(
err
)
kick_server_mock.assert_called_once_with("northbound", nb_id)
@patch.object(cluster_actions, "_kick_server")
def test_cluster_kick_both_server(self, kick_server_mock):
"""Test kicking Southbound and Northbound servers from cluster."""
sb_id = "11bb"
nb_id = "22bb"
expected_func_set_calls = [
call({"ovnsb": "requested kick of {}".format(sb_id)}),
call({"ovnnb": "requested kick of {}".format(nb_id)}),
]
kick_commands = [
call("southbound", sb_id),
call("northbound", nb_id),
]
# Test successfully kicking servers from Northbound and Southbound
# cluster
cluster_actions.ch_core.hookenv.action_get.side_effect = [
sb_id,
nb_id,
]
cluster_actions.cluster_kick()
cluster_actions.ch_core.hookenv.action_fail.assert_not_called()
cluster_actions.ch_core.hookenv.action_set.has_calls(
expected_func_set_calls
)
kick_server_mock.has_calls(kick_commands)
# Reset mocks
cluster_actions.ch_core.hookenv.action_set.reset_mock()
cluster_actions.ch_core.hookenv.action_fail.reset_mock()
cluster_actions.ch_ovn.ovn_appctl.reset_mock()
cluster_actions.ch_core.hookenv.action_get.side_effect = [
sb_id,
nb_id,
]
# Test failure to kick servers from Northbound and Southbound
# clusters
process_output = "Failed to kick server"
exception = cluster_actions.subprocess.CalledProcessError(
-1, "/usr/sbin/ovs-appctl", process_output
)
kick_server_mock.side_effect = exception
errors = [
call(
"Failed to kick Southbound cluster member {}: {}".format(
sb_id, process_output
)
),
call(
"Failed to kick Northbound cluster member {}: {}".format(
nb_id, process_output
)
),
]
cluster_actions.cluster_kick()
cluster_actions.ch_core.hookenv.action_set.assert_not_called()
cluster_actions.ch_core.hookenv.action_fail.has_calls(errors)
kick_server_mock.has_calls(kick_commands)
@patch.object(cluster_actions.reactive, "endpoint_from_flag")
def test_main_no_cluster(self, endpoint):
"""Test refusal to run action if unit is not in cluster."""
endpoint.return_value = None
err = "Unit is not part of an OVN cluster."
cluster_actions.main([])
cluster_actions.ch_core.hookenv.action_fail.assert_called_once_with(
err
)
self.mapped_action_cluster_kick.assert_not_called()
self.mapped_action_cluster_status.assert_not_called()
@patch.object(cluster_actions.reactive, "endpoint_from_flag")
def test_main_unknown_action(self, endpoint):
"""Test executing unknown action from main function."""
endpoint.return_value = MagicMock()
action = "unknown-action"
action_path = (
"/var/lib/juju/agents/unit-ovn-central-0/charm/actions/" + action
)
err = "Action {} undefined".format(action)
result = cluster_actions.main([action_path])
self.assertEquals(result, err)
self.mapped_action_cluster_kick.assert_not_called()
self.mapped_action_cluster_status.assert_not_called()
@patch.object(cluster_actions.reactive, "endpoint_from_flag")
def test_main_cluster_kick(self, endpoint):
"""Test executing cluster-kick action from main function."""
endpoint.return_value = MagicMock()
action = "cluster-kick"
action_path = (
"/var/lib/juju/agents/unit-ovn-central-0/charm/actions/" + action
)
cluster_actions.main([action_path])
cluster_actions.ch_core.hookenv.action_fail.assert_not_called()
self.mapped_action_cluster_kick.assert_called_once_with()
@patch.object(cluster_actions.reactive, "endpoint_from_flag")
def test_main_cluster_status(self, endpoint):
"""Test executing cluster-status action from main function."""
endpoint.return_value = MagicMock()
action = "cluster-status"
action_path = (
"/var/lib/juju/agents/unit-ovn-central-0/charm/actions/" + action
)
cluster_actions.main([action_path])
cluster_actions.ch_core.hookenv.action_fail.assert_not_called()
self.mapped_action_cluster_status.assert_called_once_with()

View File

@ -705,3 +705,125 @@ class TestOVNCentralCharm(Helper):
devmode=True)
self.install.assert_not_called()
self.remove.assert_not_called()
def test_cluster_leave_ok(self):
"""Test successfully leaving OVN cluster."""
self.patch_object(
ovn_central.ch_ovn,
'ovn_appctl'
)
expected_calls = [
mock.call("ovnsb_db", ("cluster/leave", "OVN_Southbound")),
mock.call("ovnnb_db", ("cluster/leave", "OVN_Northbound")),
]
self.target.leave_cluster()
ovn_central.ch_ovn.ovn_appctl.assert_has_calls(expected_calls)
def test_cluster_leave_fail(self):
"""Test failure during leaving of OVN cluster."""
self.patch_object(
ovn_central.ch_ovn,
'ovn_appctl'
)
self.patch_object(
ovn_central.ch_core.hookenv,
'log'
)
expected_err = ovn_central.subprocess.CalledProcessError(1, "foo")
ovn_central.ch_ovn.ovn_appctl.side_effect = expected_err
error_msg = (
"Failed to leave {} cluster. You can use 'cluster-kick' juju "
"action on remaining units to remove lingering cluster members."
)
expected_ovn_calls = [
mock.call("ovnsb_db", ("cluster/leave", "OVN_Southbound")),
mock.call("ovnnb_db", ("cluster/leave", "OVN_Northbound")),
]
expected_log_calls = [
mock.call(
error_msg.format("Southbound"),
ovn_central.ch_core.hookenv.ERROR
),
mock.call(
error_msg.format("Northbound"),
ovn_central.ch_core.hookenv.ERROR
),
]
self.target.leave_cluster()
ovn_central.ch_ovn.ovn_appctl.assert_has_calls(expected_ovn_calls)
ovn_central.ch_core.hookenv.log.assert_has_calls(expected_log_calls,
any_order=True)
def test_server_in_cluster(self):
"""Test detection of server in cluster."""
ipv4_in_cluster = "10.0.0.10"
ipv6_in_cluster = "2001:db8:3333:4444:5555:6666:7777:8888"
not_in_cluster = "10.0.0.1"
servers = [
("aa11", "ssl:{}:6644".format(ipv4_in_cluster)),
("bb22", "ssl:{}:6644".format(ipv6_in_cluster)),
("cc33", "ssl:10.0.0.12:6644"),
]
cluster_status = self.FakeClusterStatus(is_cluster_leader=True)
cluster_status.servers = servers
# Find expected IPv4 address in server list
self.assertTrue(
self.target.is_server_in_cluster(ipv4_in_cluster, cluster_status)
)
# Find expected IPv6 address in server list
self.assertTrue(
self.target.is_server_in_cluster(ipv6_in_cluster, cluster_status)
)
# Don't find unexpected IP in server list
self.assertFalse(
self.target.is_server_in_cluster(not_in_cluster, cluster_status)
)
def test_wait_for_server_leave_fail(self):
"""Test waiting until server leaves cluster.
This test verifies scenario when server does not leave cluster
before timeout.
"""
self.patch_object(ovn_central.time, "sleep")
self.patch_target("is_server_in_cluster", return_value=True)
self.patch_target("cluster_status")
timeout = 30
expected_retries = 6
expected_calls = []
for i in range(expected_retries):
expected_calls.append(mock.call("ovnsb_db"))
expected_calls.append(mock.call("ovnnb_db"))
result = self.target.wait_for_server_leave("10.0.0.1", timeout)
self.assertFalse(result)
self.target.cluster_status.assert_has_calls(expected_calls)
def test_wait_for_server_leave_true(self):
"""Test waiting until server leaves cluster.
This test verifies scenario when server successfully leaves
cluster during the timeout period.
"""
self.patch_object(ovn_central.time, "sleep")
self.patch_target("is_server_in_cluster", return_value=False)
self.patch_target("cluster_status")
timeout = 30
expected_calls = [
mock.call("ovnsb_db"),
mock.call("ovnnb_db"),
]
result = self.target.wait_for_server_leave("10.0.0.1", timeout)
self.assertTrue(result)
self.target.cluster_status.assert_has_calls(expected_calls)

View File

@ -37,7 +37,8 @@ class TestRegisteredHooks(test_utils.TestRegisteredHooks):
'coordinator.requested.upgrade',
'config.changed.source',
'config.changed.ovn-source'),
'configure_firewall': ('is-update-status-hook',),
'configure_firewall': ('is-update-status-hook',
'endpoint.ovsdb-peer.departed'),
'enable_default_certificates': ('is-update-status-hook',
'leadership.is_leader',),
'initialize_firewall': ('is-update-status-hook',
@ -54,7 +55,8 @@ class TestRegisteredHooks(test_utils.TestRegisteredHooks):
'coordinator.granted.upgrade',
'coordinator.requested.upgrade',
'config.changed.source',
'config.changed.ovn-source'),
'config.changed.ovn-source',
'endpoint.ovsdb-peer.departed'),
'configure_nrpe': ('charm.paused', 'is-update-status-hook',),
'stamp_fresh_deployment': ('charm.installed',
'leadership.set.install_stamp'),
@ -108,6 +110,7 @@ class TestRegisteredHooks(test_utils.TestRegisteredHooks):
'charm.installed',
'metrics-endpoint.available',
),
'handle_cluster_downscale': ('endpoint.ovsdb-peer.departed',),
},
'when_any': {
'configure_nrpe': ('config.changed.nagios_context',
@ -128,6 +131,9 @@ class TestRegisteredHooks(test_utils.TestRegisteredHooks):
'snap.installed.prometheus-ovn-exporter',
),
},
'hook': {
'leave_cluster': ('certificates-relation-broken',),
},
}
# test that the hooks were registered via the
# reactive.ovn_handlers
@ -271,3 +277,98 @@ class TestOvnCentralHandlers(test_utils.PatchHelper):
self.target.enable_services.return_value = True
handlers.render()
self.set_flag.assert_called_once_with('config.rendered')
def test_handle_cluster_downscale_leaving(self):
"""Test actions during departure of a peer unit.
This scenario tests actions of a unit that is departing the cluster.
"""
self.patch_object(handlers.reactive, 'is_flag_set')
self.is_flag_set.side_effect = [False, True]
self.patch_object(handlers.reactive, 'set_flag')
unit_name = 'ovn-central/3'
self.patch_object(
handlers.hookenv,
'departing_unit',
return_value=unit_name
)
self.patch_object(
handlers.hookenv,
'local_unit',
return_value=unit_name
)
handlers.handle_cluster_downscale()
self.target.leave_cluster.assert_called_once_with()
self.set_flag.assert_called_once_with('ovsdb-peer.left_cluster')
# subsequent calls do not trigger leave_cluster_calls()
handlers.handle_cluster_downscale()
self.target.leave_cluster.assert_called_once_with()
# unit that is leaving does not attempt to wait for remote
# unit to leave cluster.
self.target.wait_for_server_leave.assert_not_called()
def test_handle_cluster_downscale_not_leaving(self):
"""Test actions during departure of a peer unit.
This scenario tests actions of a unit whose peer is departing the
cluster.
"""
self.patch_object(handlers.reactive, 'is_flag_set', return_value=False)
self.patch_object(handlers.reactive, 'endpoint_from_name')
self.patch_object(handlers.reactive, 'set_flag')
self.patch_object(handlers, 'configure_firewall')
self.patch_object(handlers.hookenv, 'log')
local_unit_name = 'ovn-central/0'
departing_unit_name = 'ovn-central/3'
departing_unit_ip = '10.0.0.10'
departing_unit = mock.MagicMock()
departing_unit.received = {'bound-address': departing_unit_ip}
self.patch_object(
handlers.hookenv,
'departing_unit',
return_value=departing_unit_name
)
self.patch_object(
handlers.hookenv,
'local_unit',
return_value=local_unit_name
)
ovsdb_peer = mock.MagicMock()
ovsdb_peer.all_departed_units = {departing_unit_name: departing_unit}
self.endpoint_from_name.return_value = ovsdb_peer
ok_msg = ("Departing unit {} successfully disconnected from "
"cluster.".format(departing_unit_name)
)
fail_msg = (
"Departing unit {} failed to remove itself from cluster. "
"Please use action `cluster-kick` to remove straggling "
"servers from OVN cluster.".format(departing_unit_name)
)
# Test departing unit successfully leaving
self.target.wait_for_server_leave.return_value = True
handlers.handle_cluster_downscale()
self.target.wait_for_server_leave.assert_called_once_with(
departing_unit_ip
)
self.configure_firewall.assert_called_once_with()
self.log.assert_called_once_with(ok_msg, handlers.hookenv.INFO)
# Reset mocks
self.target.wait_for_server_leave.reset_mock()
self.configure_firewall.reset_mock()
self.log.reset_mock()
# Test departing unit failed to leave
self.target.wait_for_server_leave.return_value = False
handlers.handle_cluster_downscale()
self.target.wait_for_server_leave.assert_called_once_with(
departing_unit_ip
)
self.configure_firewall.assert_called_once_with()
self.log.assert_called_once_with(fail_msg, handlers.hookenv.WARNING)