Browse Source

Make subunit2sql population single threaded

This commit switches the gearman worker to handle both retrieving the
subunit file and populating the DB with that data in a single thread.
This is as opposed to pushing it on an in memory queue and processing
the streams in a separate thread. This should have 2 major benefits
first it should make the behavior much more consistent, we'll only
send work completed to the gearman server after data is already in
the db. It should also significantly improve our memory consumption.

Change-Id: Ifaac2f150e26158b0a22e1c33e0c5d39c10182a4
Matthew Treinish 3 years ago
parent
commit
d31b5d9710
No account linked to committer's email address
1 changed files with 44 additions and 66 deletions
  1. 44
    66
      files/subunit-gearman-worker.py

+ 44
- 66
files/subunit-gearman-worker.py View File

@@ -22,7 +22,6 @@ import io
22 22
 import json
23 23
 import logging
24 24
 import os
25
-import Queue
26 25
 import socket
27 26
 import threading
28 27
 import time
@@ -56,11 +55,50 @@ class FilterException(Exception):
56 55
 
57 56
 
58 57
 class SubunitRetriever(threading.Thread):
59
-    def __init__(self, gearman_worker, filters, subunitq):
58
+    def __init__(self, gearman_worker, filters, subunit2sql_conf):
60 59
         threading.Thread.__init__(self)
61 60
         self.gearman_worker = gearman_worker
62 61
         self.filters = filters
63
-        self.subunitq = subunitq
62
+        # Initialize subunit2sql settings
63
+        self.config = subunit2sql_conf
64
+        shell.cli_opts()
65
+        extensions = shell.get_extensions()
66
+        shell.parse_args([], [self.config])
67
+        self.extra_targets = shell.get_targets(extensions)
68
+
69
+    def _write_to_db(self, subunit):
70
+        subunit_v2 = subunit.pop('subunit')
71
+        # Set run metadata from gearman
72
+        log_url = subunit.pop('log_url', None)
73
+        if log_url:
74
+            log_dir = os.path.dirname(log_url)
75
+
76
+            # log_dir should be the top-level directory containing a job run,
77
+            # but the subunit file may be nested in 0 - 2 subdirectories (top,
78
+            # logs/, or logs/old/), so we need to safely correct the path here
79
+            log_base = os.path.basename(log_dir)
80
+            if log_base == 'logs':
81
+                log_dir = os.path.dirname(log_dir)
82
+            elif log_base == 'old':
83
+                log_dir = os.path.dirname(os.path.dirname(log_dir))
84
+
85
+            shell.CONF.set_override('artifacts', log_dir)
86
+        shell.CONF.set_override('run_meta', subunit)
87
+        # Parse subunit stream and store in DB
88
+        if subunit_v2.closed:
89
+            logging.debug('Trying to convert closed subunit v2 stream: %s to '
90
+                          'SQL' % subunit_v2)
91
+        else:
92
+            logging.debug('Converting Subunit V2 stream: %s to SQL' %
93
+                          subunit_v2)
94
+        stream = read_subunit.ReadSubunit(subunit_v2,
95
+                                          targets=self.extra_targets)
96
+        results = stream.get_results()
97
+        start_time = sorted(
98
+            [results[x]['start_time'] for x in results if x != 'run_time'])[0]
99
+        shell.CONF.set_override('run_at', start_time.isoformat())
100
+        shell.process_results(results)
101
+        subunit_v2.close()
64 102
 
65 103
     def run(self):
66 104
         while True:
@@ -93,7 +131,7 @@ class SubunitRetriever(threading.Thread):
93 131
                         logging.debug("Pushing subunit file: %s" % subunit_io)
94 132
                     out_event = fields.copy()
95 133
                     out_event["subunit"] = subunit_io
96
-                    self.subunitq.put(out_event)
134
+                    self._write_to_db(out_event)
97 135
                     job.sendWorkComplete()
98 136
         except Exception as e:
99 137
             logging.exception("Exception handling log event.")
@@ -163,53 +201,6 @@ class SubunitRetriever(threading.Thread):
163 201
         return gzipped, raw_buf
164 202
 
165 203
 
166
-class Subunit2SQLProcessor(object):
167
-    def __init__(self, subunitq, subunit2sql_conf):
168
-        self.subunitq = subunitq
169
-        self.config = subunit2sql_conf
170
-        # Initialize subunit2sql settings
171
-        shell.cli_opts()
172
-        extensions = shell.get_extensions()
173
-        shell.parse_args([], [self.config])
174
-        self.extra_targets = shell.get_targets(extensions)
175
-
176
-    def handle_subunit_event(self):
177
-        # Pull subunit event from queue and separate stream from metadata
178
-        subunit = self.subunitq.get()
179
-        subunit_v2 = subunit.pop('subunit')
180
-        # Set run metadata from gearman
181
-        log_url = subunit.pop('log_url', None)
182
-        if log_url:
183
-            log_dir = os.path.dirname(log_url)
184
-
185
-            # log_dir should be the top-level directory containing a job run,
186
-            # but the subunit file may be nested in 0 - 2 subdirectories (top,
187
-            # logs/, or logs/old/), so we need to safely correct the path here
188
-            log_base = os.path.basename(log_dir)
189
-            if log_base == 'logs':
190
-                log_dir = os.path.dirname(log_dir)
191
-            elif log_base == 'old':
192
-                log_dir = os.path.dirname(os.path.dirname(log_dir))
193
-
194
-            shell.CONF.set_override('artifacts', log_dir)
195
-        shell.CONF.set_override('run_meta', subunit)
196
-        # Parse subunit stream and store in DB
197
-        if subunit_v2.closed:
198
-            logging.debug('Trying to convert closed subunit v2 stream: %s to '
199
-                          'SQL' % subunit_v2)
200
-        else:
201
-            logging.debug('Converting Subunit V2 stream: %s to SQL' %
202
-                          subunit_v2)
203
-        stream = read_subunit.ReadSubunit(subunit_v2,
204
-                                          targets=self.extra_targets)
205
-        results = stream.get_results()
206
-        start_time = sorted(
207
-            [results[x]['start_time'] for x in results if x != 'run_time'])[0]
208
-        shell.CONF.set_override('run_at', start_time.isoformat())
209
-        shell.process_results(results)
210
-        subunit_v2.close()
211
-
212
-
213 204
 class Server(object):
214 205
     def __init__(self, config, debuglog):
215 206
         # Config init.
@@ -219,8 +210,6 @@ class Server(object):
219 210
         # Pythong logging output file.
220 211
         self.debuglog = debuglog
221 212
         self.retriever = None
222
-        self.subunitqueue = Queue.Queue(131072)
223
-        self.processor = None
224 213
         self.filter_factories = []
225 214
 
226 215
     def setup_logging(self):
@@ -238,28 +227,17 @@ class Server(object):
238 227
         gearman_worker.addServer(self.gearman_host,
239 228
                                  self.gearman_port)
240 229
         gearman_worker.registerFunction(b'push-subunit')
230
+        subunit2sql_conf = self.config['config']
241 231
         self.retriever = SubunitRetriever(gearman_worker,
242 232
                                           self.filter_factories,
243
-                                          self.subunitqueue)
244
-
245
-    def setup_processor(self):
246
-        subunit2sql_config = self.config['config']
247
-        self.processor = Subunit2SQLProcessor(self.subunitqueue,
248
-                                              subunit2sql_config)
233
+                                          subunit2sql_conf)
249 234
 
250 235
     def main(self):
251 236
         self.setup_retriever()
252
-        self.setup_processor()
253 237
 
254 238
         self.retriever.daemon = True
255 239
         self.retriever.start()
256 240
 
257
-        while True:
258
-            try:
259
-                self.processor.handle_subunit_event()
260
-            except:
261
-                logging.exception("Exception processing log event.")
262
-
263 241
 
264 242
 def main():
265 243
     parser = argparse.ArgumentParser()

Loading…
Cancel
Save