diff --git a/kolla/cmd/build.py b/kolla/cmd/build.py index e618899e5f..dee1bb7902 100755 --- a/kolla/cmd/build.py +++ b/kolla/cmd/build.py @@ -48,6 +48,7 @@ if PROJECT_ROOT not in sys.path: sys.path.insert(0, PROJECT_ROOT) from kolla.common import config as common_config +from kolla.common import task from kolla import version logging.basicConfig() @@ -89,33 +90,56 @@ def docker_client(): sys.exit(1) -class PushThread(threading.Thread): +class PushIntoQueueTask(task.Task): + """Task that pushes some other task into a queue.""" - def __init__(self, conf, queue): - super(PushThread, self).__init__() - self.setDaemon(True) - self.conf = conf - self.queue = queue - self.dc = docker_client() + def __init__(self, push_task, push_queue): + super(PushIntoQueueTask, self).__init__() + self.push_task = push_task + self.push_queue = push_queue + + @property + def name(self): + return 'PushIntoQueueTask(%s=>%s)' % (self.push_task.name, + self.push_queue) def run(self): - while True: - try: - image = self.queue.get() - LOG.debug('%s:Try to push the image', image['name']) - self.push_image(image) - except requests_exc.ConnectionError: - LOG.exception('%s:Make sure Docker is running and that you' - ' have the correct privileges to run Docker' - ' (root)', image['name']) - image['status'] = "connection_error" - except Exception: - LOG.exception('%s:Unknown error when pushing', image['name']) - image['status'] = "push_error" - finally: - if "error" not in image['status']: - LOG.info('%s:Pushed successfully', image['name']) - self.queue.task_done() + self.push_queue.put(self.push_task) + self.success = True + + +class PushTask(task.Task): + """Task that pushes a image to a docker repository.""" + + def __init__(self, conf, image): + super(PushTask, self).__init__() + self.dc = docker_client() + self.conf = conf + self.image = image + + @property + def name(self): + return 'PushTask(%s)' % self.image['name'] + + def run(self): + image = self.image + try: + LOG.debug('%s:Try to push the image', image['name']) + self.push_image(image) + except requests_exc.ConnectionError: + LOG.exception('%s:Make sure Docker is running and that you' + ' have the correct privileges to run Docker' + ' (root)', image['name']) + image['status'] = "connection_error" + except Exception: + LOG.exception('%s:Unknown error when pushing', image['name']) + image['status'] = "push_error" + finally: + if "error" not in image['status']: + LOG.info('%s:Pushed successfully', image['name']) + self.success = True + else: + self.success = False def push_image(self, image): image['push_logs'] = str() @@ -133,45 +157,42 @@ class PushThread(threading.Thread): LOG.error(stream['errorDetail']['message']) -class WorkerThread(threading.Thread): +class BuildTask(task.Task): + """Task that builds out an image.""" - def __init__(self, queue, push_queue, conf): + def __init__(self, conf, image, push_queue): + super(BuildTask, self).__init__() self.conf = conf - self.queue = queue + self.image = image + self.dc = docker_client() self.push_queue = push_queue self.nocache = not conf.cache or conf.no_cache self.forcerm = not conf.keep - self.dc = docker_client() - super(WorkerThread, self).__init__() - def end_task(self, image): - """Properly inform the queue we are finished""" - # No matter whether the parent failed or not, we still process - # the children. We have the code in place to catch a parent in - # an 'error' status - for child in image['children']: - self.queue.put(child) - LOG.debug('%s:Added image to queue', child['name']) - self.queue.task_done() - LOG.debug('%s:Processed', image['name']) + @property + def name(self): + return 'BuildTask(%s)' % self.image['name'] def run(self): - """Executes tasks until the queue is empty""" - while True: - try: - image = self.queue.get() - for _ in six.moves.range(self.conf.retries + 1): - self.builder(image) - if image['status'] in ['built', 'unmatched', - 'parent_error']: - break - except requests_exc.ConnectionError: - LOG.exception('Make sure Docker is running and that you' - ' have the correct privileges to run Docker' - ' (root)') - image['status'] = "connection_error" - break - self.end_task(image) + self.builder(self.image) + if self.image['status'] == 'built': + self.success = True + + @property + def followups(self): + followups = [] + if self.conf.push and self.success: + followups.extend([ + # If we are supposed to push the image into a docker + # repository, then make sure we do that... + PushIntoQueueTask( + PushTask(self.conf, self.image), + self.push_queue), + ]) + if self.image['children'] and self.success: + for image in self.image['children']: + followups.append(BuildTask(self.conf, image, self.push_queue)) + return followups def process_source(self, image, source): dest_archive = os.path.join(image['path'], source['name'] + '-archive') @@ -337,8 +358,50 @@ class WorkerThread(threading.Thread): image['status'] = "built" LOG.info('%s:Built', image['name']) - if self.conf.push: - self.push_queue.put(image) + + +class WorkerThread(threading.Thread): + """Thread that executes tasks until the queue provides a tombstone.""" + + #: Object to be put on worker queues to get them to die. + tombstone = object() + + def __init__(self, conf, queue): + super(WorkerThread, self).__init__() + self.queue = queue + self.conf = conf + + def run(self): + while True: + task = self.queue.get() + if task is self.tombstone: + # Ensure any other threads also get the tombstone. + self.queue.put(task) + break + try: + for attempt in six.moves.range(self.conf.retries + 1): + if attempt > 0: + LOG.debug("Attempting to run task %s for the %s time", + task.name, attempt + 1) + else: + LOG.debug("Attempting to run task %s for the first" + " time", task.name) + try: + task.run() + if task.success: + break + except Exception: + LOG.exception('Unhandled error when running %s', + task.name) + # try again... + task.reset() + if task.success: + for next_task in task.followups: + LOG.debug('Added next task %s to queue', + next_task.name) + self.queue.put(next_task) + finally: + self.queue.task_done() class KollaWorker(object): @@ -715,7 +778,7 @@ class KollaWorker(object): parent['children'].append(image) image['parent'] = parent - def build_queue(self): + def build_queue(self, push_queue): """Organizes Queue list Return a list of Queues that have been organized into a hierarchy @@ -729,7 +792,7 @@ class KollaWorker(object): for image in self.images: if image['parent'] is None: - queue.put(image) + queue.put(BuildTask(self.conf, image, push_queue)) LOG.debug('%s:Added image to queue', image['name']) return queue @@ -760,9 +823,6 @@ def run_build(): # to work like we want. A different size or hash will still force a rebuild kolla.set_time() - queue = kolla.build_queue() - push_queue = six.moves.queue.Queue() - if conf.save_dependency: kolla.save_dependency(conf.save_dependency) LOG.info('Docker images dependency is saved in %s', @@ -775,14 +835,20 @@ def run_build(): kolla.list_dependencies() return + push_queue = six.moves.queue.Queue() + queue = kolla.build_queue(push_queue) + workers = [] + for x in six.moves.range(conf.threads): - worker = WorkerThread(queue, push_queue, conf) + worker = WorkerThread(conf, queue) worker.setDaemon(True) worker.start() + workers.append(worker) for x in six.moves.range(conf.push_threads): - push_thread = PushThread(conf, push_queue) - push_thread.start() + worker = WorkerThread(conf, push_queue) + worker.start() + workers.append(worker) # sleep until queue is empty while queue.unfinished_tasks or push_queue.unfinished_tasks: @@ -791,6 +857,12 @@ def run_build(): kolla.summary() kolla.cleanup() + # ensure all threads exited happily + queue.put(WorkerThread.tombstone) + push_queue.put(WorkerThread.tombstone) + for w in workers: + w.join() + return kolla.get_image_statuses() diff --git a/kolla/common/task.py b/kolla/common/task.py new file mode 100644 index 0000000000..255abf85c4 --- /dev/null +++ b/kolla/common/task.py @@ -0,0 +1,42 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc + +import six + + +@six.add_metaclass(abc.ABCMeta) +class Task(object): + + def __init__(self): + self.success = False + + @abc.abstractproperty + def name(self): + pass + + def reset(self): + self.success = False + + @property + def followups(self): + return [] + + @abc.abstractmethod + def run(self): + pass + + @staticmethod + def set_status(status): + # TODO(harlowja): remove this. + pass diff --git a/kolla/tests/test_build.py b/kolla/tests/test_build.py index cc55bd48aa..d92906cedf 100644 --- a/kolla/tests/test_build.py +++ b/kolla/tests/test_build.py @@ -31,29 +31,36 @@ FAKE_IMAGE = { } -class WorkerThreadTest(base.TestCase): +class TasksTest(base.TestCase): def setUp(self): - super(WorkerThreadTest, self).setUp() + super(TasksTest, self).setUp() self.image = FAKE_IMAGE.copy() # NOTE(jeffrey4l): use a real, temporary dir self.image['path'] = self.useFixture(fixtures.TempDir()).path + @mock.patch.dict(os.environ, clear=True) + @mock.patch('docker.Client') + def test_push_image(self, mock_client): + pusher = build.PushTask(self.conf, self.image) + pusher.run() + mock_client().push.assert_called_once_with( + self.image['fullname'], stream=True, insecure_registry=True) + @mock.patch.dict(os.environ, clear=True) @mock.patch('docker.Client') def test_build_image(self, mock_client): - queue = mock.Mock() push_queue = mock.Mock() - worker = build.WorkerThread(queue, - push_queue, - self.conf) - worker.builder(self.image) + builder = build.BuildTask(self.conf, self.image, push_queue) + builder.run() mock_client().build.assert_called_once_with( path=self.image['path'], tag=self.image['fullname'], nocache=False, rm=True, pull=True, forcerm=True, buildargs=None) + self.assertTrue(builder.success) + @mock.patch.dict(os.environ, clear=True) @mock.patch('docker.Client') def test_build_image_with_build_arg(self, mock_client): @@ -62,33 +69,35 @@ class WorkerThreadTest(base.TestCase): 'NO_PROXY': '127.0.0.1' } self.conf.set_override('build_args', build_args) - worker = build.WorkerThread(mock.Mock(), - mock.Mock(), - self.conf) - worker.builder(self.image) + push_queue = mock.Mock() + builder = build.BuildTask(self.conf, self.image, push_queue) + builder.run() mock_client().build.assert_called_once_with( path=self.image['path'], tag=self.image['fullname'], nocache=False, rm=True, pull=True, forcerm=True, buildargs=build_args) + self.assertTrue(builder.success) + @mock.patch.dict(os.environ, {'http_proxy': 'http://FROM_ENV:8080'}, clear=True) @mock.patch('docker.Client') def test_build_arg_from_env(self, mock_client): + push_queue = mock.Mock() build_args = { 'http_proxy': 'http://FROM_ENV:8080', } - worker = build.WorkerThread(mock.Mock(), - mock.Mock(), - self.conf) - worker.builder(self.image) + builder = build.BuildTask(self.conf, self.image, push_queue) + builder.run() mock_client().build.assert_called_once_with( path=self.image['path'], tag=self.image['fullname'], nocache=False, rm=True, pull=True, forcerm=True, buildargs=build_args) + self.assertTrue(builder.success) + @mock.patch.dict(os.environ, {'http_proxy': 'http://FROM_ENV:8080'}, clear=True) @mock.patch('docker.Client') @@ -97,30 +106,30 @@ class WorkerThreadTest(base.TestCase): 'http_proxy': 'http://localhost:8080', } self.conf.set_override('build_args', build_args) - worker = build.WorkerThread(mock.Mock(), - mock.Mock(), - self.conf) - worker.builder(self.image) + + push_queue = mock.Mock() + builder = build.BuildTask(self.conf, self.image, push_queue) + builder.run() mock_client().build.assert_called_once_with( path=self.image['path'], tag=self.image['fullname'], nocache=False, rm=True, pull=True, forcerm=True, buildargs=build_args) + self.assertTrue(builder.success) + @mock.patch('docker.Client') @mock.patch('requests.get') def test_requests_get_timeout(self, mock_get, mock_client): - worker = build.WorkerThread(mock.Mock(), - mock.Mock(), - self.conf) self.image['source'] = { 'source': 'http://fake/source', 'type': 'url', 'name': 'fake-image-base' } + push_queue = mock.Mock() + builder = build.BuildTask(self.conf, self.image, push_queue) mock_get.side_effect = requests.exceptions.Timeout - get_result = worker.process_source(self.image, - self.image['source']) + get_result = builder.process_source(self.image, self.image['source']) self.assertIsNone(get_result) self.assertEqual(self.image['status'], 'error') @@ -128,6 +137,8 @@ class WorkerThreadTest(base.TestCase): mock_get.assert_called_once_with(self.image['source']['source'], timeout=120) + self.assertFalse(builder.success) + class KollaWorkerTest(base.TestCase):