Implement cluster update for Cluster API driver

Implement the cluster api driver methods that are called
when node groups are added or the number of workers are
updated via the API.

Mostly this is implemented by simply updating the
helm chart values. A later patch will add upgrade
support in a similar way.

story: 2009780

Change-Id: If6b77fc0db7b78bc3a7446dfffb0e1601ea7f10b
This commit is contained in:
Matt Pryor 2023-04-19 09:51:51 +00:00 committed by John Garbutt
parent f165a82b8a
commit 414575a450
4 changed files with 327 additions and 63 deletions

View File

@ -123,6 +123,23 @@ class Driver(driver.Driver):
if md:
ng_state = NodeGroupState.PENDING
# When a machine deployment is deleted, it disappears straight
# away even when there are still machines belonging to it that
# are deleting
# In that case, we want to keep the nodegroup as DELETE_IN_PROGRESS
# until all the machines for the node group are gone
if (
not md
and nodegroup.status.startswith("DELETE_")
and self._nodegroup_machines_exist(cluster, nodegroup)
):
LOG.debug(
f"Node group {nodegroup.name} "
f"for cluster {cluster.uuid} "
"machine deployment gone, but machines still found."
)
ng_state = NodeGroupState.PENDING
md_status = md.get("status", {}) if md else {}
md_phase = md_status.get("phase")
if md_phase:
@ -137,11 +154,15 @@ class Driver(driver.Driver):
# For delete we are waiting for not present
if nodegroup.status.startswith("DELETE_"):
if ng_state == NodeGroupState.NOT_PRESENT:
# Conductor will delete default nodegroups
# when cluster is deleted
if not nodegroup.is_default:
# Conductor will delete default nodegroups
# when cluster is deleted, but non default
# node groups should be deleted here.
nodegroup.destroy()
LOG.debug(
f"Node group deleted: {nodegroup.name} "
f"for cluster {cluster.uuid}"
f"for cluster {cluster.uuid} "
f"which is_default: {nodegroup.is_default}"
)
# signal the node group has been deleted
return None
@ -195,6 +216,19 @@ class Driver(driver.Driver):
return nodegroup
def _nodegroup_machines_exist(self, cluster, nodegroup):
cluster_name = self._get_chart_release_name(cluster)
nodegroup_name = self._sanitised_name(nodegroup.name)
machines = self._k8s_client.get_all_machines_by_label(
{
"capi.stackhpc.com/cluster": cluster_name,
"capi.stackhpc.com/component": "worker",
"capi.stackhpc.com/node-group": nodegroup_name,
},
self._namespace(cluster),
)
return bool(machines)
def _update_cluster_api_address(self, cluster, capi_cluster):
# As soon as we know the API address, we should set it
# This means users can access the API even if the create is
@ -237,8 +271,11 @@ class Driver(driver.Driver):
# Check the status of the addons
addons = self._k8s_client.get_addons_by_label(
"addons.stackhpc.com/cluster",
self._sanitised_name(self._get_chart_release_name(cluster)),
{
"addons.stackhpc.com/cluster": self._sanitised_name(
self._get_chart_release_name(cluster)
),
},
self._namespace(cluster)
)
for addon in addons:
@ -422,7 +459,9 @@ class Driver(driver.Driver):
# NOTE(mkjpryor) default on, like the heat driver
return kube_dash_label != "false"
def _update_helm_release(self, context, cluster):
def _update_helm_release(self, context, cluster, nodegroups=None):
if nodegroups is None:
nodegroups = cluster.nodegroups
cluster_template = cluster.cluster_template
image_id, kube_version = self._get_image_details(
context, cluster_template.image_id
@ -466,7 +505,7 @@ class Driver(driver.Driver):
"machineFlavor": ng.flavor_id,
"machineCount": ng.node_count,
}
for ng in cluster.nodegroups
for ng in nodegroups
if ng.role != NODE_GROUP_ROLE_CONTROLLER
],
}
@ -597,7 +636,12 @@ class Driver(driver.Driver):
def update_cluster(
self, context, cluster, scale_manager=None, rollback=False
):
raise NotImplementedError("don't support update yet")
# Cluster API refuses to update things like cluster networking,
# so it is safest not to implement this for now
# TODO(mkjpryor) Check what bits of update we can support
raise NotImplementedError(
"Updating a cluster in this way is not currently supported"
)
def delete_cluster(self, context, cluster):
LOG.info("Starting to delete cluster %s", cluster.uuid)
@ -628,7 +672,9 @@ class Driver(driver.Driver):
nodes_to_remove,
nodegroup=None,
):
raise NotImplementedError("don't support removing nodes this way yet")
if nodes_to_remove:
LOG.warning("Removing specific nodes is not currently supported")
self._update_helm_release(context, cluster)
def upgrade_cluster(
self,
@ -643,13 +689,30 @@ class Driver(driver.Driver):
raise NotImplementedError("don't support upgrade yet")
def create_nodegroup(self, context, cluster, nodegroup):
raise NotImplementedError("we don't support node groups yet")
nodegroup.status = fields.ClusterStatus.CREATE_IN_PROGRESS
nodegroup.save()
self._update_helm_release(context, cluster)
def update_nodegroup(self, context, cluster, nodegroup):
raise NotImplementedError("we don't support node groups yet")
nodegroup.status = fields.ClusterStatus.UPDATE_IN_PROGRESS
nodegroup.save()
self._update_helm_release(context, cluster)
def delete_nodegroup(self, context, cluster, nodegroup):
raise NotImplementedError("we don't support node groups yet")
nodegroup.status = fields.ClusterStatus.DELETE_IN_PROGRESS
nodegroup.save()
# Remove the nodegroup being deleted from the nodegroups
# for the Helm release
self._update_helm_release(
context,
cluster,
list(
[ng for ng in cluster.nodegroups if ng.name != nodegroup.name]
),
)
def create_federation(self, context, federation):
raise NotImplementedError("Will not implement 'create_federation'")

View File

@ -141,29 +141,30 @@ class Client(requests.Session):
def get_machine_deployment(self, name, namespace):
return MachineDeployment(self).fetch(name, namespace)
def get_manifests_by_label(self, label, value, namespace):
def get_manifests_by_label(self, labels, namespace):
return list(
Manifests(self).fetch_all_by_label(
label,
value,
labels,
namespace
)
)
def get_helm_releases_by_label(self, label, value, namespace):
def get_helm_releases_by_label(self, labels, namespace):
return list(
HelmRelease(self).fetch_all_by_label(
label,
value,
labels,
namespace
)
)
def get_addons_by_label(self, label, value, namespace):
addons = list(self.get_manifests_by_label(label, value, namespace))
addons.extend(self.get_helm_releases_by_label(label, value, namespace))
def get_addons_by_label(self, labels, namespace):
addons = list(self.get_manifests_by_label(labels, namespace))
addons.extend(self.get_helm_releases_by_label(labels, namespace))
return addons
def get_all_machines_by_label(self, labels, namespace):
return list(Machine(self).fetch_all_by_label(labels, namespace))
class Resource:
def __init__(self, client):
@ -202,12 +203,13 @@ class Resource:
else:
response.raise_for_status()
def fetch_all_by_label(self, label, value, namespace=None):
"""Fetches all objects with the specified label from cluster."""
def fetch_all_by_label(self, labels, namespace=None):
"""Fetches objects matching the labels from the target cluster."""
assert self.namespaced == bool(namespace)
label_selector = ",".join(f"{k}={v}" for k, v in labels.items())
continue_token = ""
while True:
params = {"labelSelector": f"{label}={value}"}
params = {"labelSelector": label_selector}
if continue_token:
params["continue"] = continue_token
response = self.client.get(
@ -270,6 +272,10 @@ class KubeadmControlPlane(Resource):
api_version = "controlplane.cluster.x-k8s.io/v1beta1"
class Machine(Resource):
api_version = "cluster.x-k8s.io/v1beta1"
class Manifests(Resource):
api_version = "addons.stackhpc.com/v1alpha1"
plural_name = "manifests"

View File

@ -20,7 +20,6 @@ from magnum.drivers.cluster_api import driver
from magnum.drivers.cluster_api import helm
from magnum.drivers.cluster_api import kubernetes
from magnum.drivers.common import k8s_monitor
from magnum import objects
from magnum.objects import fields
from magnum.tests.unit.db import base
from magnum.tests.unit.objects import utils as obj_utils
@ -210,7 +209,9 @@ class ClusterAPIDriverTest(base.DbTestCase):
mock_load.return_value = mock_client
nodegroup = mock.MagicMock()
nodegroup.name = "workers"
mock_client.get_machine_deployment.return_value = None
nodegroup.status = fields.ClusterStatus.CREATE_IN_PROGRESS
md = {"status": {}}
mock_client.get_machine_deployment.return_value = md
self.driver._update_worker_nodegroup_status(
self.cluster_obj, nodegroup
@ -220,7 +221,7 @@ class ClusterAPIDriverTest(base.DbTestCase):
"cluster-example-a-111111111111-workers", "magnum-fakeproject"
)
mock_update.assert_called_once_with(
self.cluster_obj, mock.ANY, driver.NodeGroupState.NOT_PRESENT
self.cluster_obj, nodegroup, driver.NodeGroupState.PENDING
)
@mock.patch.object(driver.Driver, "_update_nodegroup_status")
@ -255,6 +256,7 @@ class ClusterAPIDriverTest(base.DbTestCase):
mock_load.return_value = mock_client
nodegroup = mock.MagicMock()
nodegroup.name = "workers"
nodegroup.status = fields.ClusterStatus.CREATE_IN_PROGRESS
md = {"status": {"phase": "Failed"}}
mock_client.get_machine_deployment.return_value = md
@ -266,9 +268,100 @@ class ClusterAPIDriverTest(base.DbTestCase):
"cluster-example-a-111111111111-workers", "magnum-fakeproject"
)
mock_update.assert_called_once_with(
self.cluster_obj, mock.ANY, driver.NodeGroupState.FAILED
self.cluster_obj, nodegroup, driver.NodeGroupState.FAILED
)
@mock.patch.object(driver.Driver, "_update_nodegroup_status")
@mock.patch.object(kubernetes.Client, "load")
def test_update_worker_nodegroup_status_not_present_creating(
self, mock_load, mock_update
):
mock_client = mock.MagicMock(spec=kubernetes.Client)
mock_load.return_value = mock_client
nodegroup = mock.MagicMock()
nodegroup.name = "workers"
nodegroup.status = fields.ClusterStatus.CREATE_IN_PROGRESS
mock_client.get_machine_deployment.return_value = None
mock_client.get_all_machines_by_label.return_value = None
self.driver._update_worker_nodegroup_status(
self.cluster_obj, nodegroup
)
mock_client.get_machine_deployment.assert_called_once_with(
"cluster-example-a-111111111111-workers", "magnum-fakeproject"
)
mock_update.assert_called_once_with(
self.cluster_obj, nodegroup, driver.NodeGroupState.NOT_PRESENT
)
mock_client.get_all_machines_by_label.assert_not_called()
nodegroup.destroy.assert_not_called()
nodegroup.save.assert_not_called()
@mock.patch.object(driver.Driver, "_update_nodegroup_status")
@mock.patch.object(kubernetes.Client, "load")
def test_update_worker_nodegroup_status_not_present_deleting(
self, mock_load, mock_update
):
mock_client = mock.MagicMock(spec=kubernetes.Client)
mock_load.return_value = mock_client
nodegroup = mock.MagicMock()
nodegroup.name = "workers"
nodegroup.status = fields.ClusterStatus.DELETE_IN_PROGRESS
machine = {"status": {}}
mock_client.get_machine_deployment.return_value = None
mock_client.get_all_machines_by_label.return_value = machine
self.driver._update_worker_nodegroup_status(
self.cluster_obj, nodegroup
)
mock_client.get_machine_deployment.assert_called_once_with(
"cluster-example-a-111111111111-workers", "magnum-fakeproject"
)
mock_client.get_all_machines_by_label.assert_called_once_with(
{
"capi.stackhpc.com/cluster": "cluster-example-a-111111111111",
"capi.stackhpc.com/component": "worker",
"capi.stackhpc.com/node-group": "workers",
},
"magnum-fakeproject",
)
mock_update.assert_called_once_with(
self.cluster_obj, nodegroup, driver.NodeGroupState.PENDING
)
@mock.patch.object(kubernetes.Client, "load")
def test_update_worker_nodegroup_status_machines_missing_non_default(
self, mock_load
):
mock_client = mock.MagicMock(spec=kubernetes.Client)
mock_load.return_value = mock_client
nodegroup = mock.MagicMock()
nodegroup.name = "workers"
nodegroup.status = fields.ClusterStatus.DELETE_IN_PROGRESS
nodegroup.is_default = False
mock_client.get_machine_deployment.return_value = None
mock_client.get_all_machines_by_label.return_value = None
self.driver._update_worker_nodegroup_status(
self.cluster_obj, nodegroup
)
mock_client.get_machine_deployment.assert_called_once_with(
"cluster-example-a-111111111111-workers", "magnum-fakeproject"
)
mock_client.get_all_machines_by_label.assert_called_once_with(
{
"capi.stackhpc.com/cluster": "cluster-example-a-111111111111",
"capi.stackhpc.com/component": "worker",
"capi.stackhpc.com/node-group": "workers",
},
"magnum-fakeproject",
)
nodegroup.destroy.assert_called_once_with()
nodegroup.save.assert_not_called()
@mock.patch.object(driver.Driver, "_update_nodegroup_status")
@mock.patch.object(kubernetes.Client, "load")
def test_update_worker_nodegroup_status_running(
@ -441,6 +534,27 @@ class ClusterAPIDriverTest(base.DbTestCase):
self.cluster_obj, mock.ANY, driver.NodeGroupState.READY
)
@mock.patch.object(kubernetes.Client, "load")
def test_nodegroup_machines_exist(self, mock_load):
mock_client = mock.MagicMock(spec=kubernetes.Client)
mock_load.return_value = mock_client
mock_client.get_all_machines_by_label.return_value = ["item1"]
nodegroup = obj_utils.create_test_nodegroup(self.context)
result = self.driver._nodegroup_machines_exist(
self.cluster_obj, nodegroup
)
self.assertTrue(result)
mock_client.get_all_machines_by_label.assert_called_once_with(
{
"capi.stackhpc.com/cluster": "cluster-example-a-111111111111",
"capi.stackhpc.com/component": "worker",
"capi.stackhpc.com/node-group": "nodegroup1",
},
"magnum-fakeproject",
)
@mock.patch.object(k8s_monitor, "K8sMonitor")
def test_get_monitor(self, mock_mon):
self.driver.get_monitor(self.context, self.cluster_obj)
@ -795,6 +909,18 @@ class ClusterAPIDriverTest(base.DbTestCase):
self.assertIsNone(result)
def test_update_nodegroup_status_delete_non_default_destroy(self):
nodegroup = mock.MagicMock()
nodegroup.status = fields.ClusterStatus.DELETE_IN_PROGRESS
nodegroup.is_default = False
result = self.driver._update_nodegroup_status(
self.cluster_obj, nodegroup, driver.NodeGroupState.NOT_PRESENT
)
self.assertIsNone(result)
nodegroup.destroy.assert_called_once_with()
def test_update_nodegroup_status_delete_unexpected_state(self):
nodegroup = obj_utils.create_test_nodegroup(self.context)
nodegroup.status = fields.ClusterStatus.ROLLBACK_IN_PROGRESS
@ -1271,16 +1397,27 @@ class ClusterAPIDriverTest(base.DbTestCase):
self.cluster_obj,
)
def test_resize_cluster(self):
self.assertRaises(
NotImplementedError,
self.driver.resize_cluster,
@mock.patch.object(driver.Driver, "_update_helm_release")
def test_resize_cluster(self, mock_update):
self.driver.resize_cluster(
self.context,
self.cluster_obj,
None,
4,
None,
None,
)
mock_update.assert_called_once_with(self.context, self.cluster_obj)
@mock.patch.object(driver.Driver, "_update_helm_release")
def test_resize_cluster_ignore_nodes_to_remove(self, mock_update):
self.driver.resize_cluster(
self.context,
self.cluster_obj,
None,
["node1"],
None,
)
mock_update.assert_called_once_with(self.context, self.cluster_obj)
def test_upgrade_cluster(self):
self.assertRaises(
@ -1293,31 +1430,49 @@ class ClusterAPIDriverTest(base.DbTestCase):
None,
)
def test_create_nodegroup(self):
self.assertRaises(
NotImplementedError,
self.driver.create_nodegroup,
self.context,
self.cluster_obj,
objects.NodeGroup(),
@mock.patch.object(driver.Driver, "_update_helm_release")
def test_create_nodegroup(self, mock_update):
node_group = mock.MagicMock()
self.driver.create_nodegroup(
self.context, self.cluster_obj, node_group
)
def test_update_nodegroup(self):
self.assertRaises(
NotImplementedError,
self.driver.update_nodegroup,
mock_update.assert_called_once_with(self.context, self.cluster_obj)
node_group.save.assert_called_once_with()
self.assertEqual("CREATE_IN_PROGRESS", node_group.status)
@mock.patch.object(driver.Driver, "_update_helm_release")
def test_update_nodegroup(self, mock_update):
node_group = mock.MagicMock()
self.driver.update_nodegroup(
self.context,
self.cluster_obj,
objects.NodeGroup(),
node_group,
)
def test_delete_nodegroup(self):
self.assertRaises(
NotImplementedError,
self.driver.delete_nodegroup,
mock_update.assert_called_once_with(self.context, self.cluster_obj)
node_group.save.assert_called_once_with()
self.assertEqual("UPDATE_IN_PROGRESS", node_group.status)
@mock.patch.object(driver.Driver, "_update_helm_release")
def test_delete_nodegroup(self, mock_update):
self.driver.delete_nodegroup(
self.context,
self.cluster_obj,
objects.NodeGroup(),
self.cluster_obj.nodegroups[1],
)
mock_update.assert_called_once_with(
self.context,
self.cluster_obj,
mock.ANY,
)
# because nodegroups equality is broken
self.assertEqual(
self.cluster_obj.nodegroups[0].as_dict(),
mock_update.call_args.args[2][0].as_dict(),
)
def test_create_federation(self):

View File

@ -258,7 +258,7 @@ class TestKubernetesClient(base.TestCase):
self.assertEqual("mock_json", cluster)
@mock.patch.object(requests.Session, "request")
def test_get_manifests_by_label_found(self, mock_request):
def test_get_manifests_by_label(self, mock_request):
items = [
{
"kind": "Manifests",
@ -281,7 +281,7 @@ class TestKubernetesClient(base.TestCase):
mock_request.return_value = mock_response
client = kubernetes.Client(TEST_KUBECONFIG)
manifests = client.get_manifests_by_label("label", "cluster1", "ns1")
manifests = client.get_manifests_by_label({"label": "cluster1"}, "ns1")
mock_request.assert_called_once_with(
"GET",
@ -295,7 +295,7 @@ class TestKubernetesClient(base.TestCase):
self.assertEqual(items, manifests)
@mock.patch.object(requests.Session, "request")
def test_get_helm_releases_by_label_found(self, mock_request):
def test_get_helm_releases_by_label(self, mock_request):
items = [
{
"kind": "HelmRelease",
@ -319,8 +319,7 @@ class TestKubernetesClient(base.TestCase):
client = kubernetes.Client(TEST_KUBECONFIG)
helm_releases = client.get_helm_releases_by_label(
"label",
"cluster1",
{"label": "cluster1"},
"ns1"
)
@ -371,8 +370,7 @@ class TestKubernetesClient(base.TestCase):
client = kubernetes.Client(TEST_KUBECONFIG)
helm_releases = client.get_helm_releases_by_label(
"label",
"cluster1",
{"label": "cluster1"},
"ns1"
)
@ -404,7 +402,7 @@ class TestKubernetesClient(base.TestCase):
@mock.patch.object(kubernetes.Client, "get_helm_releases_by_label")
@mock.patch.object(kubernetes.Client, "get_manifests_by_label")
def test_get_addons_by_label_found(
def test_get_addons_by_label(
self,
mock_get_manifests,
mock_get_helm_releases
@ -434,16 +432,58 @@ class TestKubernetesClient(base.TestCase):
mock_get_helm_releases.return_value = helm_releases
client = kubernetes.Client(TEST_KUBECONFIG)
addons = client.get_addons_by_label("label", "cluster1", "ns1")
addons = client.get_addons_by_label({"label": "cluster1"}, "ns1")
mock_get_manifests.assert_called_once_with(
"label",
"cluster1",
{"label": "cluster1"},
"ns1"
)
mock_get_helm_releases.assert_called_once_with(
"label",
"cluster1",
{"label": "cluster1"},
"ns1"
)
self.assertEqual(manifests + helm_releases, addons)
@mock.patch.object(requests.Session, "request")
def test_get_all_machines_by_label(self, mock_request):
items = [
{
"kind": "Machine",
"metadata": {
"name": f"machine{idx}",
"namespace": "ns1"
},
}
for idx in range(5)
]
mock_response = mock.Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"metadata": {
"continue": "",
},
"items": items,
}
mock_request.return_value = mock_response
client = kubernetes.Client(TEST_KUBECONFIG)
machines = client.get_all_machines_by_label(
{"capi.stackhpc.com/cluster": "cluster_name", "foo": "bar"},
"ns1"
)
mock_request.assert_called_once_with(
"GET",
(
"https://test:6443/apis/cluster.x-k8s.io/"
"v1beta1/namespaces/ns1/machines"
),
params={
"labelSelector": (
"capi.stackhpc.com/cluster=cluster_name,foo=bar"
)
},
allow_redirects=True,
)
self.assertEqual(items, machines)