Support token based kubeconfig files for the capi management cluster

Token based kubeconfig do not have an expiry date unlike default
client-based kubeconfigs, so supporting both is useful

Change-Id: I00fa56cae04e1b1efe3e5c4cecc614647ba9d5e6
Signed-off-by: Jasleen <jasleen@stackhpc.com>
This commit is contained in:
Jasleen
2025-07-02 14:29:50 +01:00
parent 60dc96c4da
commit ff7a57ceaf
3 changed files with 65 additions and 48 deletions
+22
View File
@@ -27,6 +27,28 @@ file lives::
[capi_helm]
kubeconfig_file = /etc/magnum/kubeconfig
.. note::
We currently support two authentication methods to access
the Cluster API management cluster:
- Certificate-Based Authentication:
This method uses a client certificate and private key
embedded in the kubeconfig
to authenticate with the Kubernetes API server.
It is the default method used by Cluster API when
generating the kubeconfig Secret for workload clusters.
- Token-Based Authentication:
A Kubernetes ServiceAccount is bound to a ClusterRole,
and its associated token is used to authenticate to the cluster.
Tokens generated via Kubernetes service accounts are long-lived and,
by default, do not expire unless explicitly configured with an expiration time.
They are thus useful where persistent and uninterrupted access to the cluster
is required unlike client certificate-based kubeconfigs,
with a one-year validity and periodic manual regeneration.
Once the driver installation is complete, to create a cluster you
first need an image that has been built to include Kubernetes.
There are community-maintained packer build pipelines here:
+37 -42
View File
@@ -28,32 +28,6 @@ LOG = logging.getLogger(__name__)
CONF = conf.CONF
def ensure_file_cert(obj, file_key):
"""Returns the path and cleanup requirements of cert.
Returns a tuple containing the path to a file with the requesteddata,
and whether the file must be cleaned up.
First check if there is a file path already,
if the data is there, put it in a file,
remember for cleanup and return the created file.
"""
if file_key in obj:
return obj[file_key], False
data_key = file_key + "-data"
if data_key in obj:
# NOTE(dalees): The created file may contain private key material
# but is owned by the current user and mode 0600.
# See also: tempfile.mkstemp
# In Python 3.12, cleanup may be improved with
# delete=True,delete_on_close=False
with tempfile.NamedTemporaryFile(delete=False) as fd:
fd.write(base64.standard_b64decode(obj[data_key]))
return fd.name, True
return None, False
class Client(requests.Session):
"""Object for producing Kubernetes clients."""
@@ -65,27 +39,48 @@ class Client(requests.Session):
cluster, user = self._get_cluster_and_user(kubeconfig)
self.server = cluster["server"].rstrip("/")
ca_file, cleanup_file = ensure_file_cert(
cluster, "certificate-authority"
)
if cleanup_file and ca_file:
self._tempfiles.append(ca_file)
ca_file = self.ensure_file_cert(cluster, "certificate-authority")
if ca_file:
self.verify = ca_file
# convert certs into files as required by requests
# https://requests.readthedocs.io/en/latest/api/#requests.Session.cert
client_cert, cleanup_file = ensure_file_cert(
user, "client-certificate"
)
if cleanup_file and client_cert:
self._tempfiles.append(client_cert)
assert client_cert is not None
client_key, cleanup_file = ensure_file_cert(user, "client-key")
if cleanup_file and client_key:
self._tempfiles.append(client_key)
assert client_key is not None
self.cert = (client_cert, client_key)
client_cert = self.ensure_file_cert(user, "client-certificate")
client_key = self.ensure_file_cert(user, "client-key")
if client_cert and client_key:
self.cert = (client_cert, client_key)
elif user.get("token"):
self.headers.update({"Authorization": f"Bearer {user['token']}"})
else:
raise Exception(
"No supported authentication method found in kubeconfig"
)
def ensure_file_cert(self, obj, file_key):
"""Returns the path of cert.
Returns a string containing the path to a file with the requesteddata.
First check if there is a file path already,
if the data is there, put it in a file, add path to the _tempfiles
list for cleanup and return the created file.
"""
if file_key in obj:
return obj[file_key]
data_key = file_key + "-data"
if data_key in obj:
# NOTE(dalees): The created file may contain private key material
# but is owned by the current user and mode 0600.
# See also: tempfile.mkstemp
# In Python 3.12, cleanup may be improved with
# delete=True,delete_on_close=False
with tempfile.NamedTemporaryFile(delete=False) as fd:
fd.write(base64.standard_b64decode(obj[data_key]))
self._tempfiles.append(fd.name)
return fd.name
return None
def __del__(self):
# Remove any temporary certificate files this class owns.
+6 -6
View File
@@ -49,14 +49,15 @@ TEST_KUBECONFIG = yaml.safe_load(TEST_KUBECONFIG_YAML)
class TestKubernetesClient(base.TestCase):
# Basic lookup, non "-data" key
def test_file_or_data(self):
data, cleanup = kubernetes.ensure_file_cert(dict(key="mydata"), "key")
client = kubernetes.Client(TEST_KUBECONFIG)
data = client.ensure_file_cert(dict(key="mydata"), "key")
self.assertEqual("mydata", data)
self.assertFalse(cleanup)
# Lookup with a "-data" key, requiring temporary file
@mock.patch.object(tempfile, "NamedTemporaryFile")
def test_file_or_data_create_temp(self, mock_temp):
data, cleanup = kubernetes.ensure_file_cert(
client = kubernetes.Client(TEST_KUBECONFIG)
data = client.ensure_file_cert(
{"key-data": base64.b64encode(b"mydata").decode("utf-8")}, "key"
)
mock_temp.assert_has_calls(
@@ -68,13 +69,12 @@ class TestKubernetesClient(base.TestCase):
]
)
self.assertEqual(mock_temp().__enter__().name, data)
self.assertTrue(cleanup)
# Lookup with no key, expecting no error, and no data returned.
def test_file_or_data_missing(self):
data, cleanup = kubernetes.ensure_file_cert(dict(), "key")
client = kubernetes.Client(TEST_KUBECONFIG)
data = client.ensure_file_cert(dict(), "key")
self.assertIsNone(data)
self.assertFalse(cleanup)
def test_client_constructor(self):
client = kubernetes.Client(TEST_KUBECONFIG)