Merge "Implement an OpenShift Pod provider"

This commit is contained in:
Zuul 2019-05-13 15:30:25 +00:00 committed by Gerrit Code Review
commit 6182f7bc3a
13 changed files with 581 additions and 6 deletions

View File

@ -376,6 +376,11 @@ Options
AWS driver, see the separate section
:attr:`providers.[aws]`
.. value:: openshiftpods
For details on the extra options required and provided by the
openshiftpods driver, see the separate section
:attr:`providers.[openshiftpods]`
OpenStack Driver
----------------
@ -1353,6 +1358,103 @@ Selecting the openshift driver adds the following options to the
:value:`providers.[openshift].labels.type.pod` label type;
specifies the amount of memory in MB to request for the pod.
Openshift Pods Driver
---------------------
Selecting the openshift pods driver adds the following options to the
:attr:`providers` section of the configuration.
.. attr:: providers.[openshiftpods]
:type: list
The Openshift Pods driver is similar to the Openshift driver, but it
only support pod label to be created in a single project. This enable
using an unprivileged service account that doesn't requires the
self-provisioner role.
Example:
.. code-block:: yaml
providers:
- name: cluster
driver: openshiftpods
context: unprivileged-context-name
pools:
- name: main
labels:
- name: openshift-pod
image: docker.io/fedora:28
.. attr:: context
:required:
Name of the context configured in ``kube/config``.
Before using the driver, Nodepool services need a ``kube/config`` file
manually installed with self-provisioner (the service account needs to
be able to create projects) context.
Make sure the context is present in ``oc config get-contexts`` command
output.
.. attr:: launch-retries
:default: 3
The number of times to retry launching a pod before considering
the job failed.
.. attr:: max-pods
:default: infinite
:type: int
Maximum number of pods that can be used.
.. attr:: pools
:type: list
A pool defines a group of resources from an Openshift provider.
.. attr:: name
:required:
The project's name that will be used to create the pods.
.. attr:: labels
:type: list
Each entry in a pool`s `labels` section indicates that the
corresponding label is available for use in this pool.
Each entry is a dictionary with the following keys
.. attr:: name
:required:
Identifier for this label; references an entry in the
:attr:`labels` section.
.. attr:: image
The image name.
.. attr:: image-pull
:default: IfNotPresent
:type: str
The ImagePullPolicy, can be IfNotPresent, Always or Never.
.. attr:: cpu
:type: int
The number of cpu to request for the pod.
.. attr:: memory
:type: int
The amount of memory in MB to request for the pod.
AWS EC2 Driver
--------------

View File

@ -22,7 +22,7 @@ from nodepool.driver.utils import NodeLauncher
from nodepool.driver import NodeRequestHandler
class OpenShiftLauncher(NodeLauncher):
class OpenshiftLauncher(NodeLauncher):
def __init__(self, handler, node, provider_config, provider_label):
super().__init__(handler.zk, node, provider_config)
self.handler = handler
@ -39,7 +39,8 @@ class OpenShiftLauncher(NodeLauncher):
resource = self.handler.manager.prepareProject(project)
if self.label.type == "pod":
self.handler.manager.createPod(
project, self.label)
project, self.label.name, self.label)
self.handler.manager.waitForPod(project, self.label.name)
resource['pod'] = self.label.name
self.node.connection_type = "kubectl"
self.node.interface_ip = self.label.name
@ -134,6 +135,6 @@ class OpenshiftNodeRequestHandler(NodeRequestHandler):
def launch(self, node):
label = self.pool.labels[node.type[0]]
thd = OpenShiftLauncher(self, node, self.provider, label)
thd = OpenshiftLauncher(self, node, self.provider, label)
thd.start()
self._threads.append(thd)

View File

@ -194,7 +194,8 @@ class OpenshiftProvider(Provider):
self.log.info("%s: project created" % project)
return resource
def createPod(self, project, label):
def createPod(self, project, pod_name, label):
self.log.debug("%s: creating pod in project %s" % (pod_name, project))
spec_body = {
'name': label.name,
'image': label.image,
@ -215,15 +216,17 @@ class OpenshiftProvider(Provider):
pod_body = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {'name': label.name},
'metadata': {'name': pod_name},
'spec': {
'containers': [spec_body],
},
'restartPolicy': 'Never',
}
self.k8s_client.create_namespaced_pod(project, pod_body)
def waitForPod(self, project, pod_name):
for retry in range(300):
pod = self.k8s_client.read_namespaced_pod(label.name, project)
pod = self.k8s_client.read_namespaced_pod(pod_name, project)
if pod.status.phase == "Running":
break
self.log.debug("%s: pod status is %s", project, pod.status.phase)

View File

@ -0,0 +1,37 @@
# Copyright 2018 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
from nodepool.driver import Driver
from nodepool.driver.openshiftpods.config import OpenshiftPodsProviderConfig
from nodepool.driver.openshiftpods.provider import OpenshiftPodsProvider
from openshift import config
class OpenshiftPodsDriver(Driver):
def __init__(self):
super().__init__()
def reset(self):
try:
config.load_kube_config(persist_config=True)
except FileNotFoundError:
pass
def getProviderConfig(self, provider):
return OpenshiftPodsProviderConfig(self, provider)
def getProvider(self, provider_config):
return OpenshiftPodsProvider(provider_config)

View File

@ -0,0 +1,67 @@
# Copyright 2018 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import voluptuous as v
from nodepool.driver.openshift.config import OpenshiftPool
from nodepool.driver.openshift.config import OpenshiftProviderConfig
class OpenshiftPodsProviderConfig(OpenshiftProviderConfig):
def __eq__(self, other):
if isinstance(other, OpenshiftPodsProviderConfig):
return (super().__eq__(other) and
other.context == self.context and
other.pools == self.pools)
return False
def load(self, config):
self.launch_retries = int(self.provider.get('launch-retries', 3))
self.context = self.provider['context']
self.max_pods = self.provider.get('max-pods', math.inf)
for pool in self.provider.get('pools', []):
# Force label type to be pod
for label in pool.get('labels', []):
label['type'] = 'pod'
pp = OpenshiftPool()
pp.load(pool, config)
pp.provider = self
self.pools[pp.name] = pp
def getSchema(self):
openshift_label = {
v.Required('name'): str,
v.Required('image'): str,
'image-pull': str,
'cpu': int,
'memory': int,
'python-path': str,
}
pool = {
v.Required('name'): str,
v.Required('labels'): [openshift_label],
}
schema = OpenshiftProviderConfig.getCommonSchemaDict()
schema.update({
v.Required('pools'): [pool],
v.Required('context'): str,
'launch-retries': int,
'max-pods': int,
})
return v.Schema(schema)

View File

@ -0,0 +1,65 @@
# Copyright 2018 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from nodepool import zk
from nodepool.driver.openshift.handler import OpenshiftLauncher
from nodepool.driver.openshift.handler import OpenshiftNodeRequestHandler
class OpenshiftPodLauncher(OpenshiftLauncher):
def _launchLabel(self):
self.log.debug("Creating resource")
pod_name = "%s-%s" % (self.label.name, self.node.id)
project = self.handler.pool.name
self.handler.manager.createPod(project, pod_name, self.label)
self.node.external_id = "%s-%s" % (project, pod_name)
self.node.interface_ip = pod_name
self.zk.storeNode(self.node)
self.handler.manager.waitForPod(project, pod_name)
self.node.state = zk.READY
self.node.python_path = self.label.python_path
# NOTE: resource access token may be encrypted here
k8s = self.handler.manager.k8s_client
self.node.connection_port = {
'pod': pod_name,
'namespace': project,
'host': k8s.api_client.configuration.host,
'skiptls': not k8s.api_client.configuration.verify_ssl,
'token': self.handler.manager.token,
'user': 'zuul-worker',
}
self.node.connection_type = "kubectl"
self.zk.storeNode(self.node)
self.log.info("Pod %s is ready" % self.node.external_id)
class OpenshiftPodRequestHandler(OpenshiftNodeRequestHandler):
log = logging.getLogger("nodepool.driver.openshiftpods."
"OpenshiftPodRequestHandler")
def hasRemainingQuota(self, node_types):
if len(self.manager.listNodes()) + 1 > self.provider.max_pods:
return False
return True
def launch(self, node):
label = self.pool.labels[node.type[0]]
thd = OpenshiftPodLauncher(self, node, self.provider, label)
thd.start()
self._threads.append(thd)

View File

@ -0,0 +1,132 @@
# Copyright 2018 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import urllib3
import time
from kubernetes.config import config_exception as kce
from kubernetes import client as k8s_client
from openshift import config
from nodepool.driver.openshift.provider import OpenshiftProvider
from nodepool.driver.openshiftpods import handler
urllib3.disable_warnings()
class OpenshiftPodsProvider(OpenshiftProvider):
log = logging.getLogger("nodepool.driver.openshiftpods."
"OpenshiftPodsProvider")
def __init__(self, provider, *args):
self.provider = provider
self.ready = False
try:
self.token, self.k8s_client = self._get_client(
provider.context)
except kce.ConfigException:
self.log.exception("Couldn't load client from config")
self.log.info("Get context list using this command: "
"python3 -c \"from openshift import config; "
"print('\\n'.join([i['name'] for i in "
"config.list_kube_config_contexts()[0]]))\"")
self.token = None
self.k8s_client = None
self.pod_names = set()
for pool in provider.pools.values():
self.pod_names.update(pool.labels.keys())
def _get_client(self, context):
conf = config.new_client_from_config(context=context)
token = conf.configuration.api_key.get('authorization', '').split()[-1]
return (token, k8s_client.CoreV1Api(conf))
def start(self, zk_conn):
self.log.debug("Starting")
if self.ready or not self.k8s_client:
return
self.ready = True
def listNodes(self):
servers = []
class FakeServer:
def __init__(self, pool, pod, provider, valid_names):
self.id = "%s-%s" % (pool, pod.metadata.name)
self.name = self.id
self.metadata = {}
if [True for valid_name in valid_names
if pod.metadata.name.startswith("%s-" % valid_name)]:
node_id = pod.metadata.name.split('-')[-1]
try:
# Make sure last component of name is an id
int(node_id)
self.metadata['nodepool_provider_name'] = provider
self.metadata['nodepool_node_id'] = node_id
except Exception:
# Probably not a managed project, let's skip metadata
pass
def get(self, name, default=None):
return getattr(self, name, default)
if self.ready:
for pool in self.provider.pools.keys():
for pod in self.k8s_client.list_namespaced_pod(pool).items:
servers.append(FakeServer(
pool, pod, self.provider.name, self.pod_names))
return servers
def getProjectPodName(self, server_id):
for pool in self.provider.pools.keys():
if server_id.startswith("%s-" % pool):
pod_name = server_id[len(pool) + 1:]
return pool, pod_name
return None, None
def cleanupNode(self, server_id):
if not self.ready:
return
# Look for pool name
project_name, pod_name = self.getProjectPodName(server_id)
if not project_name:
self.log.exception("%s: unknown pool" % server_id)
return
self.log.debug("%s: removing pod" % pod_name)
delete_body = {
"apiVersion": "v1",
"kind": "DeleteOptions",
"propagationPolicy": "Background"
}
try:
self.k8s_client.delete_namespaced_pod(
pod_name, project_name, delete_body)
self.log.info("%s: pod removed" % server_id)
except Exception:
# TODO: implement better exception handling
self.log.exception("Couldn't remove pod %s" % server_id)
def waitForNodeCleanup(self, server_id):
project_name, pod_name = self.getProjectPodName(server_id)
for retry in range(300):
try:
self.k8s_client.read_namespaced_pod(pod_name, project_name)
except Exception:
break
time.sleep(1)
def getRequestHandler(self, poolworker, request):
return handler.OpenshiftPodRequestHandler(poolworker, request)

View File

@ -170,6 +170,15 @@ providers:
volume-type: gp2
volume-size: 80
- name: openshift-single-project
driver: openshiftpods
context: "/hostname:8443/developer"
pools:
- name: project-name
labels:
- name: openshift-pod
image: docker.io/fedora:28
diskimages:
- name: trusty
formats:

View File

@ -0,0 +1,18 @@
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
labels:
- name: openshift-pod
min-ready: 1
providers:
- name: openshift
driver: openshiftpods
context: {context_name}
pools:
- name: myproject
labels:
- name: openshift-pod
image: docker.io/fedora:28

View File

@ -0,0 +1,17 @@
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
labels:
- name: pod-fedora
providers:
- name: openshift
driver: openshiftpods
context: service-account.local
pools:
- name: main
labels:
- name: pod-fedora
image: docker.io/fedora:28

View File

@ -48,3 +48,12 @@ class TestOpenShift(tests.DBTestCase):
nodes = self.waitForNodes("openshift-pod", 1)
self.assertEqual(1, len(nodes))
self.assertEqual(nodes[0].connection_type, "kubectl")
def test_pods(self):
configfile = self.setup_config('pods.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
nodes = self.waitForNodes("openshift-pod", 1)
self.assertEqual(1, len(nodes))
self.assertEqual(nodes[0].connection_type, "kubectl")

View File

@ -0,0 +1,110 @@
# Copyright (C) 2018 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import fixtures
import logging
from nodepool import tests
from nodepool import zk
from nodepool.driver.openshiftpods import provider
class FakeCoreClient(object):
def __init__(self):
self.pods = []
class FakeApi:
class configuration:
host = "http://localhost:8080"
verify_ssl = False
self.api_client = FakeApi()
def list_namespaced_pod(self, project):
class FakePods:
items = self.pods
return FakePods
def create_namespaced_pod(self, ns, pod_body):
class FakePod:
class metadata:
name = pod_body['metadata']['name']
self.pods.append(FakePod)
return FakePod
def read_namespaced_pod(self, name, ns):
exist = False
for pod in self.pods:
if pod.metadata.name == name:
exist = True
break
if not exist:
raise RuntimeError("Pod doesn't exists")
class FakePod:
class status:
phase = "Running"
return FakePod
def delete_namespaced_pod(self, name, project, delete_body):
to_delete = None
for pod in self.pods:
if pod.metadata.name == name:
to_delete = pod
break
if not to_delete:
raise RuntimeError("Unknown pod %s" % name)
self.pods.remove(to_delete)
class TestDriverOpenshiftPods(tests.DBTestCase):
log = logging.getLogger("nodepool.TestDriverOpenshiftPods")
def setUp(self):
super().setUp()
self.fake_k8s_client = FakeCoreClient()
def fake_get_client(*args):
return "fake-token", self.fake_k8s_client
self.useFixture(fixtures.MockPatchObject(
provider.OpenshiftPodsProvider, '_get_client',
fake_get_client
))
def test_openshift_pod(self):
configfile = self.setup_config('openshiftpods.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('pod-fedora')
self.zk.storeNodeRequest(req)
self.log.debug("Waiting for request %s", req.id)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertNotEqual(req.nodes, [])
node = self.zk.getNode(req.nodes[0])
self.assertEqual(node.allocated_to, req.id)
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'kubectl')
self.assertEqual(node.connection_port.get('token'), 'fake-token')
node.state = zk.DELETING
self.zk.storeNode(node)
self.waitForNodeDeletion(node)

View File

@ -0,0 +1,5 @@
---
features:
- |
A new driver is available to support unprivileged Openshift cluster as a
resources provider to enable pod creation within a developper project.