Browse Source
First step to break it down is to make it importable lib rather than self-contained script. Change-Id: Ibbc924c2849cc79006046f62f31a1909ce959066 Partially-implements: bp build-refactorchanges/08/326108/3
6 changed files with 934 additions and 917 deletions
@ -0,0 +1,924 @@
|
||||
#!/usr/bin/env python |
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
from __future__ import print_function |
||||
|
||||
import datetime |
||||
import errno |
||||
import graphviz |
||||
import json |
||||
import logging |
||||
import os |
||||
import pprint |
||||
import re |
||||
import requests |
||||
import shutil |
||||
import signal |
||||
import sys |
||||
import tarfile |
||||
import tempfile |
||||
import threading |
||||
import time |
||||
|
||||
import docker |
||||
import git |
||||
import jinja2 |
||||
from oslo_config import cfg |
||||
from requests import exceptions as requests_exc |
||||
import six |
||||
|
||||
PROJECT_ROOT = os.path.abspath(os.path.join( |
||||
os.path.dirname(os.path.realpath(__file__)), '../..')) |
||||
|
||||
# NOTE(SamYaple): Update the search patch to prefer PROJECT_ROOT as the source |
||||
# of packages to import if we are using local tools/build.py |
||||
# instead of pip installed kolla-build tool |
||||
if PROJECT_ROOT not in sys.path: |
||||
sys.path.insert(0, PROJECT_ROOT) |
||||
|
||||
from kolla.common import config as common_config |
||||
from kolla.common import task |
||||
from kolla import version |
||||
|
||||
logging.basicConfig() |
||||
LOG = logging.getLogger(__name__) |
||||
LOG.setLevel(logging.INFO) |
||||
|
||||
|
||||
def handle_ctrlc(single, frame): |
||||
kollaobj = frame.f_locals['kolla'] |
||||
kollaobj.cleanup() |
||||
sys.exit(1) |
||||
|
||||
signal.signal(signal.SIGINT, handle_ctrlc) |
||||
|
||||
|
||||
class KollaDirNotFoundException(Exception): |
||||
pass |
||||
|
||||
|
||||
class KollaUnknownBuildTypeException(Exception): |
||||
pass |
||||
|
||||
|
||||
class KollaMismatchBaseTypeException(Exception): |
||||
pass |
||||
|
||||
|
||||
class KollaRpmSetupUnknownConfig(Exception): |
||||
pass |
||||
|
||||
|
||||
# Image status constants. |
||||
# |
||||
# TODO(harlowja): use enum lib in the future?? |
||||
STATUS_CONNECTION_ERROR = 'connection_error' |
||||
STATUS_PUSH_ERROR = 'push_error' |
||||
STATUS_ERROR = 'error' |
||||
STATUS_PARENT_ERROR = 'parent_error' |
||||
STATUS_BUILT = 'built' |
||||
STATUS_BUILDING = 'building' |
||||
STATUS_UNMATCHED = 'unmatched' |
||||
STATUS_MATCHED = 'matched' |
||||
STATUS_UNPROCESSED = 'unprocessed' |
||||
|
||||
# All error status constants. |
||||
STATUS_ERRORS = (STATUS_CONNECTION_ERROR, STATUS_PUSH_ERROR, |
||||
STATUS_ERROR, STATUS_PARENT_ERROR) |
||||
|
||||
|
||||
class Recorder(object): |
||||
"""Recorder/buffer of (unicode) log lines for eventual display.""" |
||||
|
||||
def __init__(self): |
||||
self._lines = [] |
||||
|
||||
def write(self, text=""): |
||||
if isinstance(text, six.text_type): |
||||
self._lines.append(text) |
||||
elif isinstance(text, six.binary_type): |
||||
self._lines.append(text.decode('utf8')) |
||||
elif isinstance(text, Recorder): |
||||
self._lines.extend(text._lines) |
||||
else: |
||||
self.write(text=str(text)) |
||||
|
||||
def clear(self): |
||||
self._lines = [] |
||||
|
||||
def __iter__(self): |
||||
for line in self._lines: |
||||
yield line |
||||
|
||||
def __str__(self): |
||||
return u"\n".join(self._lines) |
||||
|
||||
|
||||
def docker_client(): |
||||
try: |
||||
docker_kwargs = docker.utils.kwargs_from_env() |
||||
return docker.Client(version='auto', **docker_kwargs) |
||||
except docker.errors.DockerException: |
||||
LOG.exception('Can not communicate with docker service.' |
||||
'Please check docker service is running without errors') |
||||
sys.exit(1) |
||||
|
||||
|
||||
class Image(object): |
||||
def __init__(self, name, canonical_name, path, parent_name='', |
||||
status=STATUS_UNPROCESSED, parent=None, source=None): |
||||
self.name = name |
||||
self.canonical_name = canonical_name |
||||
self.path = path |
||||
self.status = status |
||||
self.parent = parent |
||||
self.source = source |
||||
self.parent_name = parent_name |
||||
self.logs = Recorder() |
||||
self.push_logs = Recorder() |
||||
self.children = [] |
||||
self.plugins = [] |
||||
|
||||
def __repr__(self): |
||||
return ("Image(%s, %s, %s, parent_name=%s," |
||||
" status=%s, parent=%s, source=%s)") % ( |
||||
self.name, self.canonical_name, self.path, |
||||
self.parent_name, self.status, self.parent, self.source) |
||||
|
||||
|
||||
class PushIntoQueueTask(task.Task): |
||||
"""Task that pushes some other task into a queue.""" |
||||
|
||||
def __init__(self, push_task, push_queue): |
||||
super(PushIntoQueueTask, self).__init__() |
||||
self.push_task = push_task |
||||
self.push_queue = push_queue |
||||
|
||||
@property |
||||
def name(self): |
||||
return 'PushIntoQueueTask(%s=>%s)' % (self.push_task.name, |
||||
self.push_queue) |
||||
|
||||
def run(self): |
||||
self.push_queue.put(self.push_task) |
||||
self.success = True |
||||
|
||||
|
||||
class PushTask(task.Task): |
||||
"""Task that pushes a image to a docker repository.""" |
||||
|
||||
def __init__(self, conf, image): |
||||
super(PushTask, self).__init__() |
||||
self.dc = docker_client() |
||||
self.conf = conf |
||||
self.image = image |
||||
|
||||
@property |
||||
def name(self): |
||||
return 'PushTask(%s)' % self.image.name |
||||
|
||||
def run(self): |
||||
image = self.image |
||||
try: |
||||
LOG.debug('%s:Try to push the image', image.name) |
||||
self.push_image(image) |
||||
except requests_exc.ConnectionError: |
||||
LOG.exception('%s:Make sure Docker is running and that you' |
||||
' have the correct privileges to run Docker' |
||||
' (root)', image.name) |
||||
image.status = STATUS_CONNECTION_ERROR |
||||
except Exception: |
||||
LOG.exception('%s:Unknown error when pushing', image.name) |
||||
image.status = STATUS_PUSH_ERROR |
||||
finally: |
||||
if (image.status not in STATUS_ERRORS |
||||
and image.status != STATUS_UNPROCESSED): |
||||
LOG.info('%s:Pushed successfully', image.name) |
||||
self.success = True |
||||
else: |
||||
self.success = False |
||||
|
||||
def push_image(self, image): |
||||
image.push_logs.clear() |
||||
for response in self.dc.push(image.canonical_name, |
||||
stream=True, |
||||
insecure_registry=True): |
||||
stream = json.loads(response) |
||||
if 'stream' in stream: |
||||
image.push_logs.write(image.logs) |
||||
image.push_logs.write(stream['stream']) |
||||
LOG.info('%s', stream['stream']) |
||||
elif 'errorDetail' in stream: |
||||
image.status = STATUS_ERROR |
||||
LOG.error(stream['errorDetail']['message']) |
||||
|
||||
|
||||
class BuildTask(task.Task): |
||||
"""Task that builds out an image.""" |
||||
|
||||
def __init__(self, conf, image, push_queue): |
||||
super(BuildTask, self).__init__() |
||||
self.conf = conf |
||||
self.image = image |
||||
self.dc = docker_client() |
||||
self.push_queue = push_queue |
||||
self.nocache = not conf.cache or conf.no_cache |
||||
self.forcerm = not conf.keep |
||||
|
||||
@property |
||||
def name(self): |
||||
return 'BuildTask(%s)' % self.image.name |
||||
|
||||
def run(self): |
||||
self.builder(self.image) |
||||
if self.image.status == STATUS_BUILT: |
||||
self.success = True |
||||
|
||||
@property |
||||
def followups(self): |
||||
followups = [] |
||||
if self.conf.push and self.success: |
||||
followups.extend([ |
||||
# If we are supposed to push the image into a docker |
||||
# repository, then make sure we do that... |
||||
PushIntoQueueTask( |
||||
PushTask(self.conf, self.image), |
||||
self.push_queue), |
||||
]) |
||||
if self.image.children and self.success: |
||||
for image in self.image.children: |
||||
followups.append(BuildTask(self.conf, image, self.push_queue)) |
||||
return followups |
||||
|
||||
def process_source(self, image, source): |
||||
dest_archive = os.path.join(image.path, source['name'] + '-archive') |
||||
|
||||
if source.get('type') == 'url': |
||||
LOG.debug("%s:Getting archive from %s", image.name, |
||||
source['source']) |
||||
try: |
||||
r = requests.get(source['source'], timeout=self.conf.timeout) |
||||
except requests_exc.Timeout: |
||||
LOG.exception('Request timed out while getting archive' |
||||
' from %s', source['source']) |
||||
image.status = STATUS_ERROR |
||||
image.logs.clear() |
||||
return |
||||
|
||||
if r.status_code == 200: |
||||
with open(dest_archive, 'wb') as f: |
||||
f.write(r.content) |
||||
else: |
||||
LOG.error('%s:Failed to download archive: status_code %s', |
||||
image.name, r.status_code) |
||||
image.status = STATUS_ERROR |
||||
return |
||||
|
||||
elif source.get('type') == 'git': |
||||
clone_dir = '{}-{}'.format(dest_archive, |
||||
source['reference'].replace('/', '-')) |
||||
try: |
||||
LOG.debug("%s:Cloning from %s", image.name, |
||||
source['source']) |
||||
git.Git().clone(source['source'], clone_dir) |
||||
git.Git(clone_dir).checkout(source['reference']) |
||||
reference_sha = git.Git(clone_dir).rev_parse('HEAD') |
||||
LOG.debug("%s:Git checkout by reference %s (%s)", |
||||
image.name, source['reference'], reference_sha) |
||||
except Exception as e: |
||||
LOG.error("%s:Failed to get source from git", image.name) |
||||
LOG.error("%s:Error:%s", image.name, str(e)) |
||||
# clean-up clone folder to retry |
||||
shutil.rmtree(clone_dir) |
||||
image.status = STATUS_ERROR |
||||
return |
||||
|
||||
with tarfile.open(dest_archive, 'w') as tar: |
||||
tar.add(clone_dir, arcname=os.path.basename(clone_dir)) |
||||
|
||||
elif source.get('type') == 'local': |
||||
LOG.debug("%s:Getting local archive from %s", image.name, |
||||
source['source']) |
||||
if os.path.isdir(source['source']): |
||||
with tarfile.open(dest_archive, 'w') as tar: |
||||
tar.add(source['source'], |
||||
arcname=os.path.basename(source['source'])) |
||||
else: |
||||
shutil.copyfile(source['source'], dest_archive) |
||||
|
||||
else: |
||||
LOG.error("%s:Wrong source type '%s'", image.name, |
||||
source.get('type')) |
||||
image.status = STATUS_ERROR |
||||
return |
||||
|
||||
# Set time on destination archive to epoch 0 |
||||
os.utime(dest_archive, (0, 0)) |
||||
|
||||
return dest_archive |
||||
|
||||
def update_buildargs(self): |
||||
buildargs = dict() |
||||
if self.conf.build_args: |
||||
buildargs = dict(self.conf.build_args) |
||||
|
||||
proxy_vars = ('HTTP_PROXY', 'http_proxy', 'HTTPS_PROXY', |
||||
'https_proxy', 'FTP_PROXY', 'ftp_proxy', |
||||
'NO_PROXY', 'no_proxy') |
||||
|
||||
for proxy_var in proxy_vars: |
||||
if proxy_var in os.environ and proxy_var not in buildargs: |
||||
buildargs[proxy_var] = os.environ.get(proxy_var) |
||||
|
||||
if not buildargs: |
||||
return None |
||||
return buildargs |
||||
|
||||
def builder(self, image): |
||||
LOG.debug('%s:Processing', image.name) |
||||
if image.status == STATUS_UNMATCHED: |
||||
return |
||||
|
||||
if (image.parent is not None and |
||||
image.parent.status in STATUS_ERRORS): |
||||
LOG.error('%s:Parent image error\'d with message "%s"', |
||||
image.name, image.parent.status) |
||||
image.status = STATUS_PARENT_ERROR |
||||
return |
||||
|
||||
image.status = STATUS_BUILDING |
||||
LOG.info('%s:Building', image.name) |
||||
|
||||
if image.source and 'source' in image.source: |
||||
self.process_source(image, image.source) |
||||
if image.status in STATUS_ERRORS: |
||||
return |
||||
|
||||
plugin_archives = list() |
||||
plugins_path = os.path.join(image.path, 'plugins') |
||||
for plugin in image.plugins: |
||||
archive_path = self.process_source(image, plugin) |
||||
if image.status in STATUS_ERRORS: |
||||
return |
||||
plugin_archives.append(archive_path) |
||||
if plugin_archives: |
||||
for plugin_archive in plugin_archives: |
||||
with tarfile.open(plugin_archive, 'r') as plugin_archive_tar: |
||||
plugin_archive_tar.extractall(path=plugins_path) |
||||
else: |
||||
try: |
||||
os.mkdir(plugins_path) |
||||
except OSError as e: |
||||
if e.errno == errno.EEXIST: |
||||
LOG.info('Directory %s already exist. Skipping.', |
||||
plugins_path) |
||||
else: |
||||
LOG.error('Failed to create directory %s: %s', |
||||
plugins_path, e) |
||||
image.status = STATUS_CONNECTION_ERROR |
||||
return |
||||
with tarfile.open(os.path.join(image.path, 'plugins-archive'), |
||||
'w') as tar: |
||||
tar.add(plugins_path, arcname='plugins') |
||||
|
||||
# Pull the latest image for the base distro only |
||||
pull = True if image.parent is None else False |
||||
|
||||
image.logs.clear() |
||||
buildargs = self.update_buildargs() |
||||
for response in self.dc.build(path=image.path, |
||||
tag=image.canonical_name, |
||||
nocache=not self.conf.cache, |
||||
rm=True, |
||||
pull=pull, |
||||
forcerm=self.forcerm, |
||||
buildargs=buildargs): |
||||
stream = json.loads(response.decode('utf-8')) |
||||
|
||||
if 'stream' in stream: |
||||
image.logs.write(stream['stream']) |
||||
for line in stream['stream'].split('\n'): |
||||
if line: |
||||
LOG.info('%s:%s', image.name, line) |
||||
|
||||
if 'errorDetail' in stream: |
||||
image.status = STATUS_ERROR |
||||
LOG.error('%s:Error\'d with the following message', |
||||
image.name) |
||||
for line in stream['errorDetail']['message'].split('\n'): |
||||
if line: |
||||
LOG.error('%s:%s', image.name, line) |
||||
return |
||||
|
||||
image.status = STATUS_BUILT |
||||
|
||||
LOG.info('%s:Built', image.name) |
||||
|
||||
|
||||
class WorkerThread(threading.Thread): |
||||
"""Thread that executes tasks until the queue provides a tombstone.""" |
||||
|
||||
#: Object to be put on worker queues to get them to die. |
||||
tombstone = object() |
||||
|
||||
def __init__(self, conf, queue): |
||||
super(WorkerThread, self).__init__() |
||||
self.queue = queue |
||||
self.conf = conf |
||||
|
||||
def run(self): |
||||
while True: |
||||
task = self.queue.get() |
||||
if task is self.tombstone: |
||||