Introduce driver for openshift virtualmachines
With openshift virtualization it makes sense to integrate this resource type with nodepool. This change proposes a new driver to create and delete virtualmachines dynamically using a VirtualMachine manifest template along with a service exposing an ssh endpoint. The template file needs to be mounted to the deployment and pointed to in the nodepool config using the openshiftlabels. Both the virtualmachine and service will be created on launch and deleted during cleanup. Change-Id: Ibd8a24edf7d35c079363b2110a65350029abe3ad
This commit is contained in:
parent
2785d3bd20
commit
521846fd51
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
import math
|
import math
|
||||||
import voluptuous as v
|
import voluptuous as v
|
||||||
|
import yaml
|
||||||
|
|
||||||
from nodepool.driver import ConfigPool
|
from nodepool.driver import ConfigPool
|
||||||
from nodepool.driver import ConfigValue
|
from nodepool.driver import ConfigValue
|
||||||
|
@ -43,15 +44,22 @@ class OpenshiftPool(ConfigPool):
|
||||||
pl = OpenshiftLabel()
|
pl = OpenshiftLabel()
|
||||||
pl.name = label['name']
|
pl.name = label['name']
|
||||||
pl.type = label['type']
|
pl.type = label['type']
|
||||||
pl.image = label.get('image')
|
pl.username = label.get('username')
|
||||||
pl.image_pull = label.get('image-pull', 'IfNotPresent')
|
|
||||||
pl.image_pull_secrets = label.get('image-pull-secrets', [])
|
|
||||||
pl.cpu = label.get('cpu')
|
|
||||||
pl.memory = label.get('memory')
|
|
||||||
pl.python_path = label.get('python-path', 'auto')
|
pl.python_path = label.get('python-path', 'auto')
|
||||||
pl.shell_type = label.get('shell-type')
|
pl.shell_type = label.get('shell-type')
|
||||||
pl.env = label.get('env', [])
|
if pl.type == 'vm':
|
||||||
pl.node_selector = label.get('node-selector')
|
manifest_file = label.get('manifest_from_file', '')
|
||||||
|
with open(manifest_file) as f:
|
||||||
|
manifest = list(yaml.safe_load_all(f.read()))
|
||||||
|
pl.manifest = manifest
|
||||||
|
else:
|
||||||
|
pl.image = label.get('image')
|
||||||
|
pl.image_pull = label.get('image-pull', 'IfNotPresent')
|
||||||
|
pl.image_pull_secrets = label.get('image-pull-secrets', [])
|
||||||
|
pl.cpu = label.get('cpu')
|
||||||
|
pl.memory = label.get('memory')
|
||||||
|
pl.env = label.get('env', [])
|
||||||
|
pl.node_selector = label.get('node-selector')
|
||||||
pl.pool = self
|
pl.pool = self
|
||||||
self.labels[pl.name] = pl
|
self.labels[pl.name] = pl
|
||||||
full_config.labels[label['name']].pools.append(self)
|
full_config.labels[label['name']].pools.append(self)
|
||||||
|
|
|
@ -249,6 +249,46 @@ class OpenshiftProvider(Provider, QuotaSupport):
|
||||||
|
|
||||||
self.k8s_client.create_namespaced_pod(project, pod_body)
|
self.k8s_client.create_namespaced_pod(project, pod_body)
|
||||||
|
|
||||||
|
def createVm(self, project, vm_name, label):
|
||||||
|
self.log.debug("%s: creating vm in project %s" % (vm_name, project))
|
||||||
|
body = label.manifest[0]
|
||||||
|
body['metadata']['name'] = vm_name
|
||||||
|
if 'metadata' in body['spec']['template']:
|
||||||
|
if 'labels' in body['spec']['template']['metadata']:
|
||||||
|
body['spec']['template']['metadata']['labels']['vm'] = vm_name
|
||||||
|
else:
|
||||||
|
body['spec']['template']['metadata']['labels'] = dict()
|
||||||
|
body['spec']['template']['metadata']['labels']['vm'] = vm_name
|
||||||
|
else:
|
||||||
|
body['spec']['template']['metadata'] = dict()
|
||||||
|
body['spec']['template']['metadata']['labels'] = dict()
|
||||||
|
body['spec']['template']['metadata']['labels']['vm'] = vm_name
|
||||||
|
self.custom_client.create_namespaced_custom_object(group='kubevirt.io', plural='virtualmachines', version='v1', namespace=project, body=body)
|
||||||
|
|
||||||
|
def createService(self, project, vm_name, label):
|
||||||
|
body = label.manifest[1]
|
||||||
|
body['spec']['selector']['vm'] = vm_name
|
||||||
|
service_name = "%s-%s" % (vm_name, body['metadata']['name'])
|
||||||
|
body['metadata']['name'] = service_name
|
||||||
|
if 'labels' in body['metadata']:
|
||||||
|
body['metadata']['labels']['vm'] = vm_name
|
||||||
|
else:
|
||||||
|
body['metadata']['labels'] = dict()
|
||||||
|
body['metadata']['labels']['vm'] = vm_name
|
||||||
|
self.k8s_client.create_namespaced_service(project, body)
|
||||||
|
for retry in range(300):
|
||||||
|
service = self.k8s_client.read_namespaced_service(service_name, project)
|
||||||
|
try:
|
||||||
|
ssh_endpoint = service.status.load_balancer.ingress[0].ip
|
||||||
|
if ssh_endpoint:
|
||||||
|
ssh_port = service.spec.ports[0].port
|
||||||
|
self.log.debug("loadbalancer ingress IP is %s", ssh_endpoint)
|
||||||
|
return ssh_port, ssh_endpoint
|
||||||
|
self.log.debug("loadbalancer ingress IP not found. Retrying.")
|
||||||
|
except TypeError:
|
||||||
|
self.log.debug("Failure reading status from service. Retrying.")
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
def waitForPod(self, project, pod_name):
|
def waitForPod(self, project, pod_name):
|
||||||
for retry in range(300):
|
for retry in range(300):
|
||||||
pod = self.k8s_client.read_namespaced_pod(pod_name, project)
|
pod = self.k8s_client.read_namespaced_pod(pod_name, project)
|
||||||
|
@ -256,12 +296,28 @@ class OpenshiftProvider(Provider, QuotaSupport):
|
||||||
break
|
break
|
||||||
self.log.debug("%s: pod status is %s", project, pod.status.phase)
|
self.log.debug("%s: pod status is %s", project, pod.status.phase)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
if retry == 299:
|
else:
|
||||||
raise exceptions.LaunchNodepoolException(
|
raise exceptions.LaunchNodepoolException(
|
||||||
"%s: pod failed to initialize (%s)" % (
|
"%s: pod failed to initialize (%s)" % (
|
||||||
project, pod.status.phase))
|
project, pod.status.phase))
|
||||||
return pod.spec.node_name
|
return pod.spec.node_name
|
||||||
|
|
||||||
|
def waitForVm(self, project, vm_name):
|
||||||
|
for retry in range(300):
|
||||||
|
vm = self.custom_client.get_namespaced_custom_object(group='kubevirt.io', plural='virtualmachines', version='v1', namespace=project, name=vm_name)
|
||||||
|
try:
|
||||||
|
if vm['status']['printableStatus'] == "Running":
|
||||||
|
break
|
||||||
|
self.log.debug("%s: vm status is %s", project, vm['status']['printableStatus'])
|
||||||
|
except KeyError:
|
||||||
|
self.log.debug("Retrying after failure reading printableStatus from vm: %s", vm['metadata']['name'])
|
||||||
|
time.sleep(1)
|
||||||
|
else:
|
||||||
|
raise exceptions.LaunchNodepoolException(
|
||||||
|
"%s: vm failed to initialize (%s)" % (
|
||||||
|
project, vm['status']['printableStatus']))
|
||||||
|
return vm['metadata']['name']
|
||||||
|
|
||||||
def getRequestHandler(self, poolworker, request):
|
def getRequestHandler(self, poolworker, request):
|
||||||
return handler.OpenshiftNodeRequestHandler(poolworker, request)
|
return handler.OpenshiftNodeRequestHandler(poolworker, request)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
# Copyright 2018 Red Hat
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
#
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from kubernetes import config as k8s_config
|
||||||
|
from nodepool.driver import Driver
|
||||||
|
from nodepool.driver.openshiftvms.config import OpenshiftVmsProviderConfig
|
||||||
|
from nodepool.driver.openshiftvms.provider import OpenshiftVmsProvider
|
||||||
|
|
||||||
|
|
||||||
|
class OpenshiftVmsDriver(Driver):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
try:
|
||||||
|
k8s_config.load_kube_config(persist_config=True)
|
||||||
|
except k8s_config.config_exception.ConfigException as e:
|
||||||
|
if 'Invalid kube-config file. No configuration found.' in str(e):
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def getProviderConfig(self, provider):
|
||||||
|
return OpenshiftVmsProviderConfig(self, provider)
|
||||||
|
|
||||||
|
def getProvider(self, provider_config):
|
||||||
|
return OpenshiftVmsProvider(provider_config)
|
|
@ -0,0 +1,67 @@
|
||||||
|
# Copyright 2018 Red Hat
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
#
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import math
|
||||||
|
import voluptuous as v
|
||||||
|
|
||||||
|
from nodepool.driver import ConfigPool
|
||||||
|
from nodepool.driver.openshift.config import OpenshiftPool
|
||||||
|
from nodepool.driver.openshift.config import OpenshiftProviderConfig
|
||||||
|
|
||||||
|
|
||||||
|
class OpenshiftVmsProviderConfig(OpenshiftProviderConfig):
|
||||||
|
def __eq__(self, other):
|
||||||
|
if isinstance(other, OpenshiftVmsProviderConfig):
|
||||||
|
return (super().__eq__(other) and
|
||||||
|
other.context == self.context and
|
||||||
|
other.pools == self.pools)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def load(self, config):
|
||||||
|
self.launch_retries = int(self.provider.get('launch-retries', 3))
|
||||||
|
self.context = self.provider['context']
|
||||||
|
self.max_resources = self.provider.get('max-resources', math.inf)
|
||||||
|
for pool in self.provider.get('pools', []):
|
||||||
|
pp = OpenshiftPool()
|
||||||
|
pp.load(pool, config)
|
||||||
|
pp.provider = self
|
||||||
|
self.pools[pp.name] = pp
|
||||||
|
|
||||||
|
def getSchema(self):
|
||||||
|
openshift_label = {
|
||||||
|
v.Required('name'): str,
|
||||||
|
'manifest_from_file': str,
|
||||||
|
'manifest': dict,
|
||||||
|
'type': str,
|
||||||
|
'python-path': str,
|
||||||
|
'shell-type': str,
|
||||||
|
'username': str,
|
||||||
|
}
|
||||||
|
|
||||||
|
pool = ConfigPool.getCommonSchemaDict()
|
||||||
|
pool.update({
|
||||||
|
v.Required('name'): str,
|
||||||
|
v.Required('labels'): [openshift_label],
|
||||||
|
})
|
||||||
|
|
||||||
|
schema = OpenshiftProviderConfig.getCommonSchemaDict()
|
||||||
|
schema.update({
|
||||||
|
v.Required('pools'): [pool],
|
||||||
|
v.Required('context'): str,
|
||||||
|
'launch-retries': int,
|
||||||
|
'max-resources': int,
|
||||||
|
})
|
||||||
|
return v.Schema(schema)
|
|
@ -0,0 +1,61 @@
|
||||||
|
# Copyright 2018 Red Hat
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from nodepool.zk import zookeeper as zk
|
||||||
|
|
||||||
|
from nodepool.driver.openshift.handler import OpenshiftLauncher
|
||||||
|
from nodepool.driver.openshift.handler import OpenshiftNodeRequestHandler
|
||||||
|
from nodepool.nodeutils import nodescan
|
||||||
|
|
||||||
|
class OpenshiftVmLauncher(OpenshiftLauncher):
|
||||||
|
def _launchLabel(self):
|
||||||
|
self.log.debug("Creating resource")
|
||||||
|
vm_name = "%s-%s" % (self.label.name, self.node.id)
|
||||||
|
project = self.handler.pool.name
|
||||||
|
self.handler.manager.createVm(project, vm_name, self.label)
|
||||||
|
self.node.external_id = "%s-%s" % (project, vm_name)
|
||||||
|
self.zk.storeNode(self.node)
|
||||||
|
vm_node_id = self.handler.manager.waitForVm(project, vm_name)
|
||||||
|
ssh_port, ssh_endpoint = self.handler.manager.createService(project, vm_name, self.label)
|
||||||
|
self.node.state = zk.READY
|
||||||
|
self.node.interface_ip = ssh_endpoint
|
||||||
|
self.node.hostname = vm_node_id
|
||||||
|
self.node.username = self.label.username
|
||||||
|
self.node.connection_port = ssh_port
|
||||||
|
self.node.host_keys = nodescan(ssh_endpoint, self.node.connection_port, timeout=100)
|
||||||
|
self.node.python_path = self.label.python_path
|
||||||
|
self.node.shell_type = self.label.shell_type
|
||||||
|
self.node.connection_type = 'ssh'
|
||||||
|
self.node.cloud = self.provider_config.context
|
||||||
|
self.node.host_id = self.node.id
|
||||||
|
self.zk.storeNode(self.node)
|
||||||
|
self.log.info("Virtualmachine %s is ready" % self.node.external_id)
|
||||||
|
|
||||||
|
|
||||||
|
class OpenshiftVmRequestHandler(OpenshiftNodeRequestHandler):
|
||||||
|
log = logging.getLogger("nodepool.driver.openshiftvms."
|
||||||
|
"OpenshiftVmRequestHandler")
|
||||||
|
|
||||||
|
def hasRemainingQuota(self, node_types):
|
||||||
|
if len(self.manager.listNodes()) + 1 > self.provider.max_resources:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def launch(self, node):
|
||||||
|
label = self.pool.labels[node.type[0]]
|
||||||
|
thd = OpenshiftVmLauncher(self, node, self.provider, label)
|
||||||
|
thd.start()
|
||||||
|
self._threads.append(thd)
|
|
@ -0,0 +1,127 @@
|
||||||
|
# Copyright 2018 Red Hat
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import urllib3
|
||||||
|
import time
|
||||||
|
|
||||||
|
from kubernetes.client.api import CustomObjectsApi
|
||||||
|
|
||||||
|
from nodepool.driver.utils_k8s import get_client
|
||||||
|
from nodepool.driver.utils import NodeDeleter
|
||||||
|
from nodepool.driver.openshift.provider import OpenshiftProvider
|
||||||
|
from nodepool.driver.openshiftvms import handler
|
||||||
|
|
||||||
|
urllib3.disable_warnings()
|
||||||
|
|
||||||
|
|
||||||
|
class OpenshiftVmsProvider(OpenshiftProvider):
|
||||||
|
log = logging.getLogger("nodepool.driver.openshiftvms."
|
||||||
|
"OpenshiftVmsProvider")
|
||||||
|
|
||||||
|
def __init__(self, provider, *args):
|
||||||
|
self.provider = provider
|
||||||
|
self.ready = False
|
||||||
|
self.token, self.ca_crt, self.k8s_client, self.custom_client = get_client(
|
||||||
|
self.log, provider.context, CustomObjectsApi)
|
||||||
|
self.vm_names = set()
|
||||||
|
for pool in provider.pools.values():
|
||||||
|
self.vm_names.update(pool.labels.keys())
|
||||||
|
|
||||||
|
def start(self, zk_conn):
|
||||||
|
self.log.debug("Starting")
|
||||||
|
self._zk = zk_conn
|
||||||
|
if self.ready or not self.custom_client:
|
||||||
|
return
|
||||||
|
self.ready = True
|
||||||
|
|
||||||
|
def listNodes(self):
|
||||||
|
servers = []
|
||||||
|
|
||||||
|
class FakeServer:
|
||||||
|
def __init__(self, pool, vm, provider, valid_names):
|
||||||
|
# self.id = "%s-%s" % (pool, vm['metadata']['name'])
|
||||||
|
self.id = vm['metadata']['name']
|
||||||
|
self.name = self.id
|
||||||
|
self.metadata = {}
|
||||||
|
|
||||||
|
if [True for valid_name in valid_names
|
||||||
|
if vm['metadata']['name'].startswith("%s-" % valid_name)]:
|
||||||
|
node_id = vm['metadata']['name'].split('-')[-1]
|
||||||
|
try:
|
||||||
|
# Make sure last component of name is an id
|
||||||
|
int(node_id)
|
||||||
|
self.metadata['nodepool_provider_name'] = provider
|
||||||
|
self.metadata['nodepool_node_id'] = node_id
|
||||||
|
except Exception:
|
||||||
|
# Probably not a managed project, let's skip metadata
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get(self, name, default=None):
|
||||||
|
return getattr(self, name, default)
|
||||||
|
|
||||||
|
if self.ready:
|
||||||
|
for pool in self.provider.pools.keys():
|
||||||
|
for vm in self.custom_client.list_namespaced_custom_object(group='kubevirt.io', plural='virtualmachines', version='v1', namespace=pool)['items']:
|
||||||
|
servers.append(FakeServer(
|
||||||
|
pool, vm, self.provider.name, self.vm_names))
|
||||||
|
return servers
|
||||||
|
|
||||||
|
def getProjectVmName(self, server_id):
|
||||||
|
for pool in self.provider.pools.keys():
|
||||||
|
if server_id.startswith("%s-" % pool):
|
||||||
|
vm_name = server_id[len(pool) + 1:]
|
||||||
|
return pool, vm_name
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
def startNodeCleanup(self, node):
|
||||||
|
t = NodeDeleter(self._zk, self, node)
|
||||||
|
t.start()
|
||||||
|
return t
|
||||||
|
|
||||||
|
def cleanupNode(self, server_id):
|
||||||
|
if not self.ready:
|
||||||
|
return
|
||||||
|
# Look for pool name
|
||||||
|
project_name, vm_name = self.getProjectVmName(server_id)
|
||||||
|
if not project_name:
|
||||||
|
self.log.exception("%s: unknown pool" % server_id)
|
||||||
|
return
|
||||||
|
self.log.debug("%s: removing vm" % vm_name)
|
||||||
|
try:
|
||||||
|
self.custom_client.delete_namespaced_custom_object(group='kubevirt.io', plural='virtualmachines', version='v1', name=vm_name, namespace=project_name)
|
||||||
|
self.log.info("%s: vm removed" % server_id)
|
||||||
|
except Exception:
|
||||||
|
# TODO: implement better exception handling
|
||||||
|
self.log.exception("Couldn't remove vm %s" % server_id)
|
||||||
|
self.log.debug("%s: removing service" % vm_name)
|
||||||
|
try:
|
||||||
|
service = self.k8s_client.list_namespaced_service(project_name, label_selector="vm={}".format(vm_name)).items[0]
|
||||||
|
service_name = service.metadata.name
|
||||||
|
self.k8s_client.delete_namespaced_service(service_name, project_name)
|
||||||
|
self.log.info("%s: service removed" % service_name)
|
||||||
|
except Exception:
|
||||||
|
self.log.exception("Couldn't remove service %s" % service_name)
|
||||||
|
|
||||||
|
def waitForNodeCleanup(self, server_id):
|
||||||
|
project_name, vm_name = self.getProjectVmName(server_id)
|
||||||
|
for retry in range(300):
|
||||||
|
try:
|
||||||
|
self.custom_client.get_namespaced_custom_object(group='kubevirt.io', plural='virtualmachines', version='v1', name=vm_name, namespace=project_name)
|
||||||
|
except Exception:
|
||||||
|
break
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
def getRequestHandler(self, poolworker, request):
|
||||||
|
return handler.OpenshiftVmRequestHandler(poolworker, request)
|
Loading…
Reference in New Issue