zuul-operator/zuul_operator/utils.py

101 lines
2.7 KiB
Python

# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import secrets
import string
import kopf
import yaml
import jinja2
import kubernetes
from kubernetes.client import Configuration
from kubernetes.client.api import core_v1_api
from kubernetes.stream import stream
from . import objects
def object_from_dict(data):
return objects.get_object(data['apiVersion'], data['kind'])
def zuul_to_json(x):
return json.dumps(x)
def apply_file(api, fn, **kw):
env = jinja2.Environment(
loader=jinja2.PackageLoader('zuul_operator', 'templates'))
env.filters['zuul_to_json'] = zuul_to_json
tmpl = env.get_template(fn)
text = tmpl.render(**kw)
data = yaml.safe_load_all(text)
namespace = kw.get('namespace')
for document in data:
if namespace:
document['metadata']['namespace'] = namespace
if kw.get('_adopt', True):
kopf.adopt(document)
obj = object_from_dict(document)(api, document)
if not obj.exists():
obj.create()
else:
obj.update()
def generate_password(length=32):
alphabet = string.ascii_letters + string.digits
return ''.join(secrets.choice(alphabet) for i in range(length))
def make_secret(namespace, name, string_data):
return {
'apiVersion': 'v1',
'kind': 'Secret',
'metadata': {
'namespace': namespace,
'name': name,
},
'stringData': string_data
}
def update_secret(api, namespace, name, string_data):
obj = make_secret(namespace, name, string_data)
secret = objects.Secret(api, obj)
if secret.exists():
secret.update()
else:
secret.create()
def pod_exec(namespace, name, command):
kubernetes.config.load_kube_config()
try:
c = Configuration().get_default_copy()
except AttributeError:
c = Configuration()
c.assert_hostname = False
Configuration.set_default(c)
api = core_v1_api.CoreV1Api()
resp = stream(api.connect_get_namespaced_pod_exec,
name,
namespace,
command=command,
stderr=True, stdin=False,
stdout=True, tty=False)
return resp