Introduce driver for openshift virtualmachines

With openshift virtualization it makes sense to integrate this resource
type with nodepool. This change proposes a new driver to create and delete
virtualmachines dynamically using a VirtualMachine manifest template
along with a service exposing an ssh endpoint. The template file needs
to be mounted to the deployment and pointed to in the nodepool config
using the openshiftlabels.

Both the virtualmachine and service will be created on launch and
deleted during cleanup.

Change-Id: Ibd8a24edf7d35c079363b2110a65350029abe3ad
This commit is contained in:
Per Wiklund 2023-01-19 15:29:14 +01:00
parent 2785d3bd20
commit 1fd031bf38
6 changed files with 369 additions and 8 deletions

View File

@ -16,6 +16,7 @@
import math
import voluptuous as v
import yaml
from nodepool.driver import ConfigPool
from nodepool.driver import ConfigValue
@ -43,15 +44,22 @@ class OpenshiftPool(ConfigPool):
pl = OpenshiftLabel()
pl.name = label['name']
pl.type = label['type']
pl.image = label.get('image')
pl.image_pull = label.get('image-pull', 'IfNotPresent')
pl.image_pull_secrets = label.get('image-pull-secrets', [])
pl.cpu = label.get('cpu')
pl.memory = label.get('memory')
pl.username = label.get('username')
pl.python_path = label.get('python-path', 'auto')
pl.shell_type = label.get('shell-type')
pl.env = label.get('env', [])
pl.node_selector = label.get('node-selector')
if pl.type == 'vm':
manifest_file = label.get('manifest_from_file', '')
with open(manifest_file) as f:
manifest = list(yaml.safe_load_all(f.read()))
pl.manifest = manifest
else:
pl.image = label.get('image')
pl.image_pull = label.get('image-pull', 'IfNotPresent')
pl.image_pull_secrets = label.get('image-pull-secrets', [])
pl.cpu = label.get('cpu')
pl.memory = label.get('memory')
pl.env = label.get('env', [])
pl.node_selector = label.get('node-selector')
pl.pool = self
self.labels[pl.name] = pl
full_config.labels[label['name']].pools.append(self)

View File

@ -249,6 +249,46 @@ class OpenshiftProvider(Provider, QuotaSupport):
self.k8s_client.create_namespaced_pod(project, pod_body)
def createVm(self, project, vm_name, label):
self.log.debug("%s: creating vm in project %s" % (vm_name, project))
body = label.manifest[0]
body['metadata']['name'] = vm_name
if 'metadata' in body['spec']['template']:
if 'labels' in body['spec']['template']['metadata']:
body['spec']['template']['metadata']['labels']['vm'] = vm_name
else:
body['spec']['template']['metadata']['labels'] = dict()
body['spec']['template']['metadata']['labels']['vm'] = vm_name
else:
body['spec']['template']['metadata'] = dict()
body['spec']['template']['metadata']['labels'] = dict()
body['spec']['template']['metadata']['labels']['vm'] = vm_name
self.custom_client.create_namespaced_custom_object(group='kubevirt.io', plural='virtualmachines', version='v1', namespace=project, body=body)
def createService(self, project, vm_name, label):
body = label.manifest[1]
body['spec']['selector']['vm'] = vm_name
service_name = "%s-%s" % (vm_name, body['metadata']['name'])
body['metadata']['name'] = service_name
if 'labels' in body['metadata']:
body['metadata']['labels']['vm'] = vm_name
else:
body['metadata']['labels'] = dict()
body['metadata']['labels']['vm'] = vm_name
self.k8s_client.create_namespaced_service(project, body)
for retry in range(300):
service = self.k8s_client.read_namespaced_service(service_name, project)
try:
ssh_endpoint = service.status.load_balancer.ingress[0].ip
if ssh_endpoint:
ssh_port = service.spec.ports[0].port
self.log.debug("loadbalancer ingress IP is %s", ssh_endpoint)
return ssh_port, ssh_endpoint
self.log.debug("loadbalancer ingress IP not found. Retrying.")
except TypeError:
self.log.debug("Failure reading status from service. Retrying.")
time.sleep(1)
def waitForPod(self, project, pod_name):
for retry in range(300):
pod = self.k8s_client.read_namespaced_pod(pod_name, project)
@ -256,12 +296,28 @@ class OpenshiftProvider(Provider, QuotaSupport):
break
self.log.debug("%s: pod status is %s", project, pod.status.phase)
time.sleep(1)
if retry == 299:
else:
raise exceptions.LaunchNodepoolException(
"%s: pod failed to initialize (%s)" % (
project, pod.status.phase))
return pod.spec.node_name
def waitForVm(self, project, vm_name):
for retry in range(300):
vm = self.custom_client.get_namespaced_custom_object(group='kubevirt.io', plural='virtualmachines', version='v1', namespace=project, name=vm_name)
try:
if vm['status']['printableStatus'] == "Running":
break
self.log.debug("%s: vm status is %s", project, vm['status']['printableStatus'])
except KeyError:
self.log.debug("Retrying after failure reading printableStatus from vm: %s", vm['metadata']['name'])
time.sleep(1)
else:
raise exceptions.LaunchNodepoolException(
"%s: vm failed to initialize (%s)" % (
project, vm['status']['printableStatus']))
return vm['metadata']['name']
def getRequestHandler(self, poolworker, request):
return handler.OpenshiftNodeRequestHandler(poolworker, request)

View File

@ -0,0 +1,42 @@
# Copyright 2018 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
from kubernetes import config as k8s_config
from nodepool.driver import Driver
from nodepool.driver.openshiftvms.config import OpenshiftVmsProviderConfig
from nodepool.driver.openshiftvms.provider import OpenshiftVmsProvider
class OpenshiftVmsDriver(Driver):
def __init__(self):
super().__init__()
def reset(self):
try:
k8s_config.load_kube_config(persist_config=True)
except k8s_config.config_exception.ConfigException as e:
if 'Invalid kube-config file. No configuration found.' in str(e):
pass
else:
raise
except FileNotFoundError:
pass
def getProviderConfig(self, provider):
return OpenshiftVmsProviderConfig(self, provider)
def getProvider(self, provider_config):
return OpenshiftVmsProvider(provider_config)

View File

@ -0,0 +1,67 @@
# Copyright 2018 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import voluptuous as v
from nodepool.driver import ConfigPool
from nodepool.driver.openshift.config import OpenshiftPool
from nodepool.driver.openshift.config import OpenshiftProviderConfig
class OpenshiftVmsProviderConfig(OpenshiftProviderConfig):
def __eq__(self, other):
if isinstance(other, OpenshiftVmsProviderConfig):
return (super().__eq__(other) and
other.context == self.context and
other.pools == self.pools)
return False
def load(self, config):
self.launch_retries = int(self.provider.get('launch-retries', 3))
self.context = self.provider['context']
self.max_resources = self.provider.get('max-resources', math.inf)
for pool in self.provider.get('pools', []):
pp = OpenshiftPool()
pp.load(pool, config)
pp.provider = self
self.pools[pp.name] = pp
def getSchema(self):
openshift_label = {
v.Required('name'): str,
'manifest_from_file': str,
'manifest': dict,
'type': str,
'python-path': str,
'shell-type': str,
'username': str,
}
pool = ConfigPool.getCommonSchemaDict()
pool.update({
v.Required('name'): str,
v.Required('labels'): [openshift_label],
})
schema = OpenshiftProviderConfig.getCommonSchemaDict()
schema.update({
v.Required('pools'): [pool],
v.Required('context'): str,
'launch-retries': int,
'max-resources': int,
})
return v.Schema(schema)

View File

@ -0,0 +1,61 @@
# Copyright 2018 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from nodepool.zk import zookeeper as zk
from nodepool.driver.openshift.handler import OpenshiftLauncher
from nodepool.driver.openshift.handler import OpenshiftNodeRequestHandler
from nodepool.nodeutils import nodescan
class OpenshiftVmLauncher(OpenshiftLauncher):
def _launchLabel(self):
self.log.debug("Creating resource")
vm_name = "%s-%s" % (self.label.name, self.node.id)
project = self.handler.pool.name
self.handler.manager.createVm(project, vm_name, self.label)
self.node.external_id = "%s-%s" % (project, vm_name)
self.zk.storeNode(self.node)
vm_node_id = self.handler.manager.waitForVm(project, vm_name)
ssh_port, ssh_endpoint = self.handler.manager.createService(project, vm_name, self.label)
self.node.state = zk.READY
self.node.interface_ip = ssh_endpoint
self.node.hostname = vm_node_id
self.node.username = self.label.username
self.node.connection_port = ssh_port
self.node.host_keys = nodescan(ssh_endpoint, self.node.connection_port, timeout=100)
self.node.python_path = self.label.python_path
self.node.shell_type = self.label.shell_type
self.node.connection_type = 'ssh'
self.node.cloud = self.provider_config.context
self.node.host_id = self.node.id
self.zk.storeNode(self.node)
self.log.info("Virtualmachine %s is ready" % self.node.external_id)
class OpenshiftVmRequestHandler(OpenshiftNodeRequestHandler):
log = logging.getLogger("nodepool.driver.openshiftvms."
"OpenshiftVmRequestHandler")
def hasRemainingQuota(self, node_types):
if len(self.manager.listNodes()) + 1 > self.provider.max_resources:
return False
return True
def launch(self, node):
label = self.pool.labels[node.type[0]]
thd = OpenshiftVmLauncher(self, node, self.provider, label)
thd.start()
self._threads.append(thd)

View File

@ -0,0 +1,127 @@
# Copyright 2018 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import urllib3
import time
from kubernetes.client.api import CustomObjectsApi
from nodepool.driver.utils_k8s import get_client
from nodepool.driver.utils import NodeDeleter
from nodepool.driver.openshift.provider import OpenshiftProvider
from nodepool.driver.openshiftvms import handler
urllib3.disable_warnings()
class OpenshiftVmsProvider(OpenshiftProvider):
log = logging.getLogger("nodepool.driver.openshiftvms."
"OpenshiftVmsProvider")
def __init__(self, provider, *args):
self.provider = provider
self.ready = False
_, _, self.k8s_client, self.custom_client = get_client(
self.log, provider.context, CustomObjectsApi)
self.vm_names = set()
for pool in provider.pools.values():
self.vm_names.update(pool.labels.keys())
def start(self, zk_conn):
self.log.debug("Starting")
self._zk = zk_conn
if self.ready or not self.custom_client:
return
self.ready = True
def listNodes(self):
servers = []
class FakeServer:
def __init__(self, pool, vm, provider, valid_names):
# self.id = "%s-%s" % (pool, vm['metadata']['name'])
self.id = vm['metadata']['name']
self.name = self.id
self.metadata = {}
if [True for valid_name in valid_names
if vm['metadata']['name'].startswith("%s-" % valid_name)]:
node_id = vm['metadata']['name'].split('-')[-1]
try:
# Make sure last component of name is an id
int(node_id)
self.metadata['nodepool_provider_name'] = provider
self.metadata['nodepool_node_id'] = node_id
except Exception:
# Probably not a managed project, let's skip metadata
pass
def get(self, name, default=None):
return getattr(self, name, default)
if self.ready:
for pool in self.provider.pools.keys():
for vm in self.custom_client.list_namespaced_custom_object(group='kubevirt.io', plural='virtualmachines', version='v1', namespace=pool)['items']:
servers.append(FakeServer(
pool, vm, self.provider.name, self.vm_names))
return servers
def getProjectVmName(self, server_id):
for pool in self.provider.pools.keys():
if server_id.startswith("%s-" % pool):
vm_name = server_id[len(pool) + 1:]
return pool, vm_name
return None, None
def startNodeCleanup(self, node):
t = NodeDeleter(self._zk, self, node)
t.start()
return t
def cleanupNode(self, server_id):
if not self.ready:
return
# Look for pool name
project_name, vm_name = self.getProjectVmName(server_id)
if not project_name:
self.log.exception("%s: unknown pool" % server_id)
return
self.log.debug("%s: removing vm" % vm_name)
try:
self.custom_client.delete_namespaced_custom_object(group='kubevirt.io', plural='virtualmachines', version='v1', name=vm_name, namespace=project_name)
self.log.info("%s: vm removed" % server_id)
except Exception:
# TODO: implement better exception handling
self.log.exception("Couldn't remove vm %s" % server_id)
self.log.debug("%s: removing service" % vm_name)
try:
service = self.k8s_client.list_namespaced_service(project_name, label_selector="vm={}".format(vm_name)).items[0]
service_name = service.metadata.name
self.k8s_client.delete_namespaced_service(service_name, project_name)
self.log.info("%s: service removed" % service_name)
except Exception:
self.log.exception("Couldn't remove service %s" % service_name)
def waitForNodeCleanup(self, server_id):
project_name, vm_name = self.getProjectVmName(server_id)
for retry in range(300):
try:
self.custom_client.get_namespaced_custom_object(group='kubevirt.io', plural='virtualmachines', version='v1', name=vm_name, namespace=project_name)
except Exception:
break
time.sleep(1)
def getRequestHandler(self, poolworker, request):
return handler.OpenshiftVmRequestHandler(poolworker, request)