ClusterAPI: write secrets into capi mgr k8s

Create an app cred and write it into the capi managmenet
cluster where it is expected by the helm chart.
Also write in the certs generated by magnum such that
magnum knowns what the kubeconfig will be.

To do this we add a simple kuberenetes client.
For best evetlet compatiblity,
this client uses the requests libary
to make REST API calls to kubernetes.

Future patches will extend the k8s client to
do other things, like check on the current
progress of cluster creation, after the helm
chart has created the appropriate resources
within the capi management cluster.

story: 2009780

Change-Id: I80c9f8fdf971dcd29043db1ef8ae0e8dc5b472bb
This commit is contained in:
John Garbutt 2023-04-25 12:08:49 +01:00 committed by John Garbutt
parent 6d930c8e6e
commit 40c3be03c6
6 changed files with 725 additions and 1 deletions

View File

@ -0,0 +1,71 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import yaml
import certifi
from magnum.common import clients
from magnum.common import utils
import magnum.conf
CONF = magnum.conf.CONF
def get_openstack_ca_certificate():
# This function returns the CA bundle to use when verifying TLS
# connections to the OpenStack API in both the Cluster API provider
# and OpenStack integrations on the cluster (e.g. OCCM, Cinder CSI)
#
# If no CA bundle is specified in config we use the CA bundle from
# certifi
# This is because the Cluster API provider contains NO trusted CAs
# and, because it is a pod in Kubernetes, it does NOT pick up the
# trusted CAs from the host
ca_certificate = utils.get_openstack_ca()
if not ca_certificate:
with open(certifi.where(), "r") as ca_file:
ca_certificate = ca_file.read()
return ca_certificate
def _create_app_cred(context, cluster):
osc = clients.OpenStackClients(context)
appcred = osc.keystone().client.application_credentials.create(
user=cluster.user_id,
name=f"magnum-{cluster.uuid}",
description=f"Magnum cluster ({cluster.uuid})",
)
return {
"clouds": {
"openstack": {
"identity_api_version": 3,
"region_name": osc.cinder_region_name(),
"interface": CONF.nova_client.endpoint_type.replace("URL", ""),
# This config item indicates whether TLS should be
# verified when connecting to the OpenStack API
"verify": CONF.drivers.verify_ca,
"auth": {
"auth_url": osc.url_for(
service_type="identity", interface="public"
),
"application_credential_id": appcred.id,
"application_credential_secret": appcred.secret,
},
},
},
}
def get_app_cred_yaml(context, cluster):
app_cred_dict = _create_app_cred(context, cluster)
return yaml.safe_dump(app_cred_dict)

View File

@ -13,13 +13,18 @@
import re
from oslo_log import log as logging
from oslo_utils import encodeutils
from magnum.api import utils as api_utils
from magnum.common import clients
from magnum.common import exception
from magnum.common import short_id
from magnum.common.x509 import operations as x509
from magnum.conductor.handlers.common import cert_manager
from magnum import conf
from magnum.drivers.cluster_api import app_creds
from magnum.drivers.cluster_api import helm
from magnum.drivers.cluster_api import kubernetes
from magnum.drivers.common import driver
LOG = logging.getLogger(__name__)
@ -29,6 +34,13 @@ CONF = conf.CONF
class Driver(driver.Driver):
def __init__(self):
self._helm_client = helm.Client()
self.__k8s_client = None
@property
def _k8s_client(self):
if not self.__k8s_client:
self.__k8s_client = kubernetes.Client.load()
return self.__k8s_client
@property
def provides(self):
@ -113,7 +125,6 @@ class Driver(driver.Driver):
values = {
"kubernetesVersion": kube_version,
"machineImageId": image_id,
# TODO(johngarbutt): need to generate app creds
"cloudCredentialsSecretName": self._get_app_cred_name(cluster),
# TODO(johngarbutt): need to respect requested networks
"clusterNetworking": {
@ -193,6 +204,75 @@ class Driver(driver.Driver):
def _get_chart_release_name(self, cluster):
return cluster.stack_id
def _k8s_resource_labels(self, cluster):
return {
"magnum.openstack.org/project-id": cluster.project_id,
"magnum.openstack.org/user-id": cluster.user_id,
"magnum.openstack.org/cluster-uuid": cluster.uuid,
}
def _create_appcred_secret(self, context, cluster):
ca_certificate = app_creds.get_openstack_ca_certificate()
appcred_yaml = app_creds.get_app_cred_yaml(context, cluster)
name = self._get_app_cred_name(cluster)
self._k8s_client.apply_secret(
name,
{
"metadata": {"labels": self._k8s_resource_labels(cluster)},
"stringData": {
"cacert": ca_certificate,
"clouds.yaml": appcred_yaml,
},
},
self._namespace(cluster),
)
def _decode_cert(self, cert):
return encodeutils.safe_decode(cert.get_certificate())
def _decode_key(self, cert):
key = x509.decrypt_key(
cert.get_private_key(),
cert.get_private_key_passphrase(),
)
return encodeutils.safe_decode(key)
def _ensure_certificate_secrets(self, context, cluster):
# Magnum creates CA certs for each of the Kubernetes components that
# must be trusted by the cluster
# In particular, this is required for "openstack coe cluster config"
# to work, as that doesn't communicate with the driver and instead
# relies on the correct CA being trusted by the cluster
# Cluster API looks for specific named secrets for each of the CAs,
# and generates them if they don't exist, so we create them here
# with the correct certificates in
certificates = {
"ca": cert_manager.get_cluster_ca_certificate(cluster, context),
"etcd": cert_manager.get_cluster_ca_certificate(
cluster, context, "etcd"
),
"proxy": cert_manager.get_cluster_ca_certificate(
cluster, context, "front_proxy"
),
"sa": cert_manager.get_cluster_magnum_cert(cluster, context),
}
for name, cert in certificates.items():
self._k8s_client.apply_secret(
self._sanitised_name(
self._get_chart_release_name(cluster), name
),
{
"metadata": {"labels": self._k8s_resource_labels(cluster)},
"type": "cluster.x-k8s.io/secret",
"stringData": {
"tls.crt": self._decode_cert(cert),
"tls.key": self._decode_key(cert),
},
},
self._namespace(cluster),
)
def create_cluster(self, context, cluster, cluster_create_timeout):
LOG.info("Starting to create cluster %s", cluster.uuid)
@ -201,6 +281,10 @@ class Driver(driver.Driver):
# and it makes renaming clusters in the API possible
self._generate_release_name(cluster)
self._k8s_client.ensure_namespace(self._namespace(cluster))
self._create_appcred_secret(context, cluster)
self._ensure_certificate_secrets(context, cluster)
self._update_helm_release(context, cluster)
def update_cluster(

View File

@ -0,0 +1,181 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import copy
import os
import pathlib
import re
import tempfile
import yaml
from oslo_log import log as logging
import requests
from magnum import conf
LOG = logging.getLogger(__name__)
CONF = conf.CONF
def file_or_data(obj, file_key):
"""Returns a path to a file containing the requested data.
First check if there is a file path already,
if the data is there, put it in a file,
and return a path to the temp directory
"""
if file_key in obj:
return obj[file_key]
data_key = file_key + "-data"
if data_key in obj:
# TODO(johngarbutt) check permissions on this file!
# and check how it gets deleted
with tempfile.NamedTemporaryFile(delete=False) as fd:
fd.write(base64.standard_b64decode(obj[data_key]))
return fd.name
return None
class Client(requests.Session):
"""Object for producing Kubernetes clients."""
KUBECONFIG_ENV_NAME = "KUBECONFIG"
def __init__(self, kubeconfig):
super().__init__()
cluster, user = self._get_cluster_and_user(kubeconfig)
self.server = cluster["server"].rstrip("/")
ca_file = file_or_data(cluster, "certificate-authority")
if ca_file:
self.verify = ca_file
# convert certs into files as required by requests
client_cert = file_or_data(user, "client-certificate")
assert client_cert is not None
self.cert = (client_cert, file_or_data(user, "client-key"))
def _get_cluster_and_user(self, kubeconfig):
# get the context
current_context = kubeconfig["current-context"]
context = [
c["context"]
for c in kubeconfig["contexts"]
if c["name"] == current_context
][0]
# extract cluster and user from context
cluster = [
c["cluster"]
for c in kubeconfig["clusters"]
if c["name"] == context["cluster"]
][0]
user = [
u["user"]
for u in kubeconfig["users"]
if u["name"] == context["user"]
][0]
return cluster, user
@classmethod
def _get_kubeconfig_path(cls):
# use config if specified
if CONF.capi_driver.kubeconfig_file:
return CONF.capi_driver.kubeconfig_file
if cls.KUBECONFIG_ENV_NAME in os.environ:
return os.environ[cls.KUBECONFIG_ENV_NAME]
# the default kubeconfig location
return pathlib.Path.home() / ".kube" / "config"
@classmethod
def _load_kubeconfig(cls, path):
with open(path) as fd:
return yaml.safe_load(fd)
@classmethod
def load(cls):
path = cls._get_kubeconfig_path()
kubeconfig = cls._load_kubeconfig(path)
return Client(kubeconfig)
def request(self, method, url, *args, **kwargs):
# Make sure to add the server to any relative URLs
if re.match(r"^http(s)://", url) is None:
url = "{}{}".format(self.server, url)
response = super().request(method, url, *args, **kwargs)
LOG.debug(
'Kubernetes API request: "%s %s" %s',
method,
url,
response.status_code,
)
return response
def ensure_namespace(self, namespace):
Namespace(self).apply(namespace)
def apply_secret(self, secret_name, data, namespace):
Secret(self).apply(secret_name, data, namespace)
class Resource:
def __init__(self, client):
self.client = client
assert hasattr(self, "api_version")
self.kind = getattr(self, "kind", type(self).__name__)
self.plural_name = getattr(
self, "plural_name", self.kind.lower() + "s"
)
self.namespaced = getattr(self, "namespaced", True)
def prepare_path(self, name=None, namespace=None):
# Begin with either /api or /apis depending whether the api version
# is the core API
prefix = "/apis" if "/" in self.api_version else "/api"
# Include the namespace unless the resource is namespaced
path_namespace = f"/namespaces/{namespace}" if namespace else ""
# Include the resource name if given
path_name = f"/{name}" if name else ""
return (
f"{prefix}/{self.api_version}{path_namespace}/"
f"{self.plural_name}{path_name}"
)
def apply(self, name, data=None, namespace=None):
"""Applies the given object to the target Kubernetes cluster."""
assert self.namespaced == bool(namespace)
body_data = copy.deepcopy(data) if data else {}
body_data["apiVersion"] = self.api_version
body_data["kind"] = self.kind
body_data.setdefault("metadata", {})["name"] = name
if namespace:
body_data["metadata"]["namespace"] = namespace
response = self.client.patch(
self.prepare_path(name, namespace),
json=body_data,
headers={"Content-Type": "application/apply-patch+yaml"},
params={"fieldManager": "magnum", "force": "true"},
)
response.raise_for_status()
return response.json()
class Namespace(Resource):
api_version = "v1"
namespaced = False
class Secret(Resource):
api_version = "v1"

View File

@ -0,0 +1,99 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from unittest import mock
from magnum.common import clients
from magnum.common import utils
from magnum.drivers.cluster_api import app_creds
from magnum.tests.unit.db import base
from magnum.tests.unit.objects import utils as obj_utils
class TestAppCreds(base.DbTestCase):
def setUp(self):
super().setUp()
self.cluster_obj = obj_utils.create_test_cluster(
self.context,
name="cluster_example_$A",
master_flavor_id="flavor_small",
flavor_id="flavor_medium",
)
@mock.patch.object(utils, "get_openstack_ca")
def test_get_openstack_ca_certificate(self, mock_ca):
mock_ca.return_value = "cert"
cert = app_creds.get_openstack_ca_certificate()
self.assertEqual("cert", cert)
@mock.patch.object(utils, "get_openstack_ca")
def test_get_openstack_ca_certificate_get_certify(self, mock_ca):
mock_ca.return_value = None
cert = app_creds.get_openstack_ca_certificate()
self.assertIsNotNone(cert)
@mock.patch.object(clients, "OpenStackClients")
def test_create_app_cred(self, mock_client):
mock_client().cinder_region_name.return_value = "cinder"
mock_client().url_for.return_value = "http://keystone"
mock_app_cred = mock_client().keystone().client.application_credentials
app_cred = collections.namedtuple("appcred", ["id", "secret"])
mock_app_cred.create.return_value = app_cred("id", "pass")
app_cred = app_creds._create_app_cred("context", self.cluster_obj)
expected = {
"clouds": {
"openstack": {
"auth": {
"application_credential_id": "id",
"application_credential_secret": "pass",
"auth_url": "http://keystone",
},
"identity_api_version": 3,
"interface": "public",
"region_name": "cinder",
"verify": True,
}
}
}
self.assertEqual(expected, app_cred)
mock_client().url_for.assert_called_once_with(
service_type="identity", interface="public"
)
mock_app_cred.create.assert_called_once_with(
user="fake_user",
name=f"magnum-{self.cluster_obj.uuid}",
description=f"Magnum cluster ({self.cluster_obj.uuid})",
)
@mock.patch.object(app_creds, "_create_app_cred")
def test_get_app_cred_yaml(self, mock_create):
mock_create.return_value = {
"clouds": {
"openstack": {"auth": {"application_credential_id": "id"}}
}
}
app_cred = app_creds.get_app_cred_yaml("context", "cluster")
expected = """\
clouds:
openstack:
auth:
application_credential_id: id
"""
self.assertEqual(expected, app_cred)

View File

@ -12,9 +12,13 @@
from unittest import mock
from magnum.common import exception
from magnum.common.x509 import operations as x509
from magnum.conductor.handlers.common import cert_manager
from magnum import conf
from magnum.drivers.cluster_api import app_creds
from magnum.drivers.cluster_api import driver
from magnum.drivers.cluster_api import helm
from magnum.drivers.cluster_api import kubernetes
from magnum import objects
from magnum.tests.unit.db import base
from magnum.tests.unit.objects import utils as obj_utils
@ -56,6 +60,9 @@ class ClusterAPIDriverTest(base.DbTestCase):
self.assertEqual("magnum-123456f", namespace)
def test_label_return_default(self):
self.cluster_obj.labels = dict()
self.cluster_obj.cluster_template.labels = dict()
result = self.driver._label(self.cluster_obj, "foo", "bar")
self.assertEqual("bar", result)
@ -192,14 +199,22 @@ class ClusterAPIDriverTest(base.DbTestCase):
self.assertEqual("1.42.0", version)
@mock.patch.object(driver.Driver, "_ensure_certificate_secrets")
@mock.patch.object(driver.Driver, "_create_appcred_secret")
@mock.patch.object(kubernetes.Client, "load")
@mock.patch.object(driver.Driver, "_get_image_details")
@mock.patch.object(helm.Client, "install_or_upgrade")
def test_create_cluster(
self,
mock_install,
mock_image,
mock_load,
mock_appcred,
mock_certs,
):
mock_image.return_value = ("imageid1", "1.27.4")
mock_client = mock.MagicMock(spec=kubernetes.Client)
mock_load.return_value = mock_client
self.cluster_obj.keypair = "kp1"
@ -243,6 +258,120 @@ class ClusterAPIDriverTest(base.DbTestCase):
version=CONF.capi_driver.helm_chart_version,
namespace="magnum-fakeproject",
)
mock_client.ensure_namespace.assert_called_once_with(
"magnum-fakeproject"
)
mock_appcred.assert_called_once_with(self.context, self.cluster_obj)
mock_certs.assert_called_once_with(self.context, self.cluster_obj)
@mock.patch.object(app_creds, "get_app_cred_yaml")
@mock.patch.object(app_creds, "get_openstack_ca_certificate")
@mock.patch.object(kubernetes.Client, "load")
def test_create_appcred_secret(self, mock_load, mock_cert, mock_yaml):
mock_client = mock.MagicMock(spec=kubernetes.Client)
mock_load.return_value = mock_client
mock_cert.return_value = "ca"
mock_yaml.return_value = "appcred"
self.driver._create_appcred_secret(self.context, self.cluster_obj)
uuid = self.cluster_obj.uuid
mock_client.apply_secret.assert_called_once_with(
"cluster-example-a-111111111111-cloud-credentials",
{
"metadata": {
"labels": {
"magnum.openstack.org/project-id": "fake_project",
"magnum.openstack.org/user-id": "fake_user",
"magnum.openstack.org/cluster-uuid": uuid,
}
},
"stringData": {"cacert": "ca", "clouds.yaml": "appcred"},
},
"magnum-fakeproject",
)
@mock.patch.object(cert_manager, "get_cluster_magnum_cert")
@mock.patch.object(cert_manager, "get_cluster_ca_certificate")
@mock.patch.object(driver.Driver, "_decode_key")
@mock.patch.object(driver.Driver, "_decode_cert")
@mock.patch.object(driver.Driver, "_k8s_resource_labels")
@mock.patch.object(kubernetes.Client, "load")
def test_ensure_certificate_secrets(
self, mock_load, mock_labels, mock_cert, mock_key, mock_ca, mock_mag
):
mock_client = mock.MagicMock(spec=kubernetes.Client)
mock_load.return_value = mock_client
mock_labels.return_value = dict(foo="bar")
# TODO(johngarbutt): use side effects here?
mock_cert.return_value = "cert1"
mock_key.return_value = "key1"
mock_ca.return_value = "cert_mgr_ca"
mock_mag.return_value = "cert_mag"
self.driver._ensure_certificate_secrets(self.context, self.cluster_obj)
mock_client.apply_secret.assert_has_calls(
[
mock.call(
"cluster-example-a-111111111111-ca",
{
"metadata": {"labels": {"foo": "bar"}},
"type": "cluster.x-k8s.io/secret",
"stringData": {"tls.crt": "cert1", "tls.key": "key1"},
},
"magnum-fakeproject",
),
mock.call(
"cluster-example-a-111111111111-etcd",
{
"metadata": {"labels": {"foo": "bar"}},
"type": "cluster.x-k8s.io/secret",
"stringData": {"tls.crt": "cert1", "tls.key": "key1"},
},
"magnum-fakeproject",
),
mock.call(
"cluster-example-a-111111111111-proxy",
{
"metadata": {"labels": {"foo": "bar"}},
"type": "cluster.x-k8s.io/secret",
"stringData": {"tls.crt": "cert1", "tls.key": "key1"},
},
"magnum-fakeproject",
),
mock.call(
"cluster-example-a-111111111111-sa",
{
"metadata": {"labels": {"foo": "bar"}},
"type": "cluster.x-k8s.io/secret",
"stringData": {"tls.crt": "cert1", "tls.key": "key1"},
},
"magnum-fakeproject",
),
]
)
# TODO(johngarbutt): assert more calls for the other mocks here
def test_decode_cert(self):
mock_cert = mock.MagicMock()
mock_cert.get_certificate.return_value = "cert"
result = self.driver._decode_cert(mock_cert)
self.assertEqual("cert", result)
@mock.patch.object(x509, "decrypt_key")
def test_decode_key(self, mock_decrypt):
mock_cert = mock.MagicMock()
mock_cert.get_private_key.return_value = "private"
mock_cert.get_private_key_passphrase.return_value = "pass"
mock_decrypt.return_value = "foo"
result = self.driver._decode_key(mock_cert)
self.assertEqual("foo", result)
mock_decrypt.assert_called_once_with("private", "pass")
@mock.patch.object(helm.Client, "uninstall_release")
def test_delete_cluster(self, mock_uninstall):

View File

@ -0,0 +1,160 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import os
import pathlib
import tempfile
from unittest import mock
import yaml
import requests
from magnum.drivers.cluster_api import kubernetes
from magnum.tests import base
TEST_SERVER = "https://test:6443"
TEST_KUBECONFIG_YAML = f"""\
apiVersion: v1
clusters:
- cluster:
certificate-authority: "cafile"
server: {TEST_SERVER}
name: default
contexts:
- context:
cluster: default
user: default
name: default
current-context: default
kind: Config
users:
- name: default
user:
client-certificate: "certfile"
client-key: "keyfile"
"""
TEST_KUBECONFIG = yaml.safe_load(TEST_KUBECONFIG_YAML)
class TestKubernetesClient(base.TestCase):
def test_file_or_data(self):
data = kubernetes.file_or_data(dict(key="mydata"), "key")
self.assertEqual("mydata", data)
@mock.patch.object(tempfile, "NamedTemporaryFile")
def test_file_or_data_create_temp(self, mock_temp):
data = kubernetes.file_or_data(
{"key-data": base64.b64encode(b"mydata").decode("utf-8")}, "key"
)
mock_temp.assert_has_calls(
[
mock.call(delete=False),
mock.call().__enter__(),
mock.call().__enter__().write(b"mydata"),
mock.call().__exit__(None, None, None),
]
)
self.assertEqual(mock_temp().__enter__().name, data)
def test_file_or_data_missing(self):
data = kubernetes.file_or_data(dict(), "key")
self.assertIsNone(data)
def test_client_constructor(self):
client = kubernetes.Client(TEST_KUBECONFIG)
self.assertEqual(TEST_SERVER, client.server)
self.assertEqual("cafile", client.verify)
self.assertEqual(("certfile", "keyfile"), client.cert)
def test_get_kubeconfig_path_default(self):
self.assertEqual(
pathlib.Path.home() / ".kube" / "config",
kubernetes.Client._get_kubeconfig_path(),
)
def test_get_kubeconfig_path_config(self):
os.environ["KUBECONFIG"] = "bar"
self.config(kubeconfig_file="foo", group="capi_driver")
path = kubernetes.Client._get_kubeconfig_path()
del os.environ["KUBECONFIG"]
self.assertEqual("foo", path)
def test_get_kubeconfig_path_env(self):
os.environ["KUBECONFIG"] = "bar"
path = kubernetes.Client._get_kubeconfig_path()
del os.environ["KUBECONFIG"]
self.assertEqual("bar", path)
@mock.patch(
"builtins.open",
new_callable=mock.mock_open,
read_data=TEST_KUBECONFIG_YAML,
)
def test_client_load(self, mock_open):
self.config(kubeconfig_file="mypath", group="capi_driver")
client = kubernetes.Client.load()
self.assertEqual(TEST_SERVER, client.server)
mock_open.assert_called_once_with("mypath")
@mock.patch.object(requests.Session, "request")
def test_ensure_namespace(self, mock_request):
client = kubernetes.Client(TEST_KUBECONFIG)
client.ensure_namespace("namespace1")
mock_request.assert_called_once_with(
"PATCH",
"https://test:6443/api/v1/namespaces/namespace1",
data=None,
json={
"apiVersion": "v1",
"kind": "Namespace",
"metadata": {"name": "namespace1"},
},
headers={"Content-Type": "application/apply-patch+yaml"},
params={"fieldManager": "magnum", "force": "true"},
)
@mock.patch.object(requests.Session, "request")
def test_apply_secret(self, mock_request):
client = kubernetes.Client(TEST_KUBECONFIG)
test_data = dict(
stringData=dict(foo="bar"), metadata=dict(labels=dict(baz="asdf"))
)
client.apply_secret("secname", test_data, "ns1")
mock_request.assert_called_once_with(
"PATCH",
"https://test:6443/api/v1/namespaces/ns1/secrets/secname",
data=None,
json={
"stringData": {"foo": "bar"},
"apiVersion": "v1",
"kind": "Secret",
"metadata": {
"labels": {"baz": "asdf"},
"name": "secname",
"namespace": "ns1",
},
},
headers={"Content-Type": "application/apply-patch+yaml"},
params={"fieldManager": "magnum", "force": "true"},
)