From 2145418c55b0bba909a617e3844724659e5ecb26 Mon Sep 17 00:00:00 2001 From: David Shrewsbury Date: Mon, 5 Jun 2017 14:15:18 -0400 Subject: [PATCH] Add log streaming test Test that our finger log streamer will correctly find the log file and stream its contents to a client. Change-Id: Ia153e2d436855dcce256d5fc91d0fd4c47116042 --- .../git/common-config/playbooks/python27.yaml | 11 ++ .../streamer/git/common-config/zuul.yaml | 17 +++ .../streamer/git/org_project/.zuul.yaml | 5 + .../config/streamer/git/org_project/README | 1 + tests/fixtures/config/streamer/main.yaml | 8 ++ tests/unit/test_log_streamer.py | 105 +++++++++++++++++- 6 files changed, 144 insertions(+), 3 deletions(-) create mode 100644 tests/fixtures/config/streamer/git/common-config/playbooks/python27.yaml create mode 100644 tests/fixtures/config/streamer/git/common-config/zuul.yaml create mode 100644 tests/fixtures/config/streamer/git/org_project/.zuul.yaml create mode 100644 tests/fixtures/config/streamer/git/org_project/README create mode 100644 tests/fixtures/config/streamer/main.yaml diff --git a/tests/fixtures/config/streamer/git/common-config/playbooks/python27.yaml b/tests/fixtures/config/streamer/git/common-config/playbooks/python27.yaml new file mode 100644 index 0000000000..753e7e2e26 --- /dev/null +++ b/tests/fixtures/config/streamer/git/common-config/playbooks/python27.yaml @@ -0,0 +1,11 @@ +# NOTE(Shrews): Do not run any tasks that will need zuul_console to stream +# output because that will not work. Since we just need any output in our +# ansible log, the test coordination tasks should be sufficient. +- hosts: all + tasks: + - debug: var=waitpath + + # Do not finish until test creates the flag file + - wait_for: + state: present + path: "{{waitpath}}" diff --git a/tests/fixtures/config/streamer/git/common-config/zuul.yaml b/tests/fixtures/config/streamer/git/common-config/zuul.yaml new file mode 100644 index 0000000000..d8df96a288 --- /dev/null +++ b/tests/fixtures/config/streamer/git/common-config/zuul.yaml @@ -0,0 +1,17 @@ +- pipeline: + name: check + manager: independent + trigger: + gerrit: + - event: patchset-created + success: + gerrit: + verified: 1 + failure: + gerrit: + verified: -1 + +- job: + name: python27 + vars: + waitpath: '{{zuul._test.test_root}}/{{zuul.uuid}}/test_wait' diff --git a/tests/fixtures/config/streamer/git/org_project/.zuul.yaml b/tests/fixtures/config/streamer/git/org_project/.zuul.yaml new file mode 100644 index 0000000000..89a567472e --- /dev/null +++ b/tests/fixtures/config/streamer/git/org_project/.zuul.yaml @@ -0,0 +1,5 @@ +- project: + name: org/project + check: + jobs: + - python27 diff --git a/tests/fixtures/config/streamer/git/org_project/README b/tests/fixtures/config/streamer/git/org_project/README new file mode 100644 index 0000000000..9daeafb986 --- /dev/null +++ b/tests/fixtures/config/streamer/git/org_project/README @@ -0,0 +1 @@ +test diff --git a/tests/fixtures/config/streamer/main.yaml b/tests/fixtures/config/streamer/main.yaml new file mode 100644 index 0000000000..208e274b13 --- /dev/null +++ b/tests/fixtures/config/streamer/main.yaml @@ -0,0 +1,8 @@ +- tenant: + name: tenant-one + source: + gerrit: + config-projects: + - common-config + untrusted-projects: + - org/project diff --git a/tests/unit/test_log_streamer.py b/tests/unit/test_log_streamer.py index 3ea5a8e50c..f422e97b26 100644 --- a/tests/unit/test_log_streamer.py +++ b/tests/unit/test_log_streamer.py @@ -14,9 +14,12 @@ # License for the specific language governing permissions and limitations # under the License. -import logging +import os +import os.path import socket import tempfile +import threading +import time import zuul.lib.log_streamer import tests.base @@ -24,8 +27,6 @@ import tests.base class TestLogStreamer(tests.base.BaseTestCase): - log = logging.getLogger("zuul.test.cloner") - def setUp(self): super(TestLogStreamer, self).setUp() self.host = '0.0.0.0' @@ -51,3 +52,101 @@ class TestLogStreamer(tests.base.BaseTestCase): self.addCleanup(s.close) self.assertNotEqual(0, s.connect_ex((self.host, port))) s.close() + + +class TestStreaming(tests.base.AnsibleZuulTestCase): + + tenant_config_file = 'config/streamer/main.yaml' + + def setUp(self): + super(TestStreaming, self).setUp() + self.host = '0.0.0.0' + self.streamer = None + self.stop_streamer = False + self.streaming_data = '' + + def stopStreamer(self): + self.stop_streamer = True + + def startStreamer(self, port, build_uuid, root=None): + if not root: + root = tempfile.gettempdir() + self.streamer = zuul.lib.log_streamer.LogStreamer(None, self.host, + port, root) + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect((self.host, port)) + self.addCleanup(s.close) + + req = '%s\n' % build_uuid + s.sendall(req.encode('utf-8')) + + while not self.stop_streamer: + data = s.recv(2048) + if not data: + break + self.streaming_data += data.decode('utf-8') + + s.shutdown(socket.SHUT_RDWR) + s.close() + self.streamer.stop() + + def test_streaming(self): + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + + # We don't have any real synchronization for the ansible jobs, so + # just wait until we get our running build. + while not len(self.builds): + time.sleep(0.1) + build = self.builds[0] + self.assertEqual(build.name, 'python27') + + build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid) + while not os.path.exists(build_dir): + time.sleep(0.1) + + # Need to wait to make sure that jobdir gets set + while build.jobdir is None: + time.sleep(0.1) + build = self.builds[0] + + # Wait for the job to begin running and create the ansible log file. + # The job waits to complete until the flag file exists, so we can + # safely access the log here. We only open it (to force a file handle + # to be kept open for it after the job finishes) but wait to read the + # contents until the job is done. + ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt') + while not os.path.exists(ansible_log): + time.sleep(0.1) + logfile = open(ansible_log, 'r') + self.addCleanup(logfile.close) + + # Create a thread to stream the log. We need this to be happening + # before we create the flag file to tell the job to complete. + port = 7901 + streamer_thread = threading.Thread( + target=self.startStreamer, + args=(port, build.uuid, self.executor_server.jobdir_root,) + ) + streamer_thread.start() + self.addCleanup(self.stopStreamer) + + # Allow the job to complete, which should close the streaming + # connection (and terminate the thread) as well since the log file + # gets closed/deleted. + flag_file = os.path.join(build_dir, 'test_wait') + open(flag_file, 'w').close() + self.waitUntilSettled() + streamer_thread.join() + + # Now that the job is finished, the log file has been closed by the + # job and deleted. However, we still have a file handle to it, so we + # can make sure that we read the entire contents at this point. + file_contents = logfile.readlines() + logfile.close() + + # Compact the returned lines into a single string for easy comparison. + orig = ''.join(file_contents) + self.log.debug("\n\nFile contents: %s\n\n", orig) + self.log.debug("\n\nStreamed: %s\n\n", self.streaming_data) + self.assertEqual(orig, self.streaming_data)