Puppet module for Subunit2sql
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

subunit-gearman-worker.py 9.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. #!/usr/bin/python2
  2. #
  3. # Copyright 2013 Hewlett-Packard Development Company, L.P.
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  6. # not use this file except in compliance with the License. You may obtain
  7. # a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  13. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  14. # License for the specific language governing permissions and limitations
  15. # under the License.
  16. import argparse
  17. import cStringIO
  18. import daemon
  19. import gear
  20. import gzip
  21. import json
  22. import logging
  23. import os
  24. import Queue
  25. import socket
  26. import threading
  27. import time
  28. import urllib2
  29. import yaml
  30. from subunit2sql import read_subunit
  31. from subunit2sql import shell
  32. try:
  33. import daemon.pidlockfile as pidfile_mod
  34. except ImportError:
  35. import daemon.pidfile as pidfile_mod
  36. def semi_busy_wait(seconds):
  37. # time.sleep() may return early. If it does sleep() again and repeat
  38. # until at least the number of seconds specified has elapsed.
  39. start_time = time.time()
  40. while True:
  41. time.sleep(seconds)
  42. cur_time = time.time()
  43. seconds = seconds - (cur_time - start_time)
  44. if seconds <= 0.0:
  45. return
  46. class FilterException(Exception):
  47. pass
  48. class SubunitRetriever(threading.Thread):
  49. def __init__(self, gearman_worker, filters, subunitq):
  50. threading.Thread.__init__(self)
  51. self.gearman_worker = gearman_worker
  52. self.filters = filters
  53. self.subunitq = subunitq
  54. def run(self):
  55. while True:
  56. try:
  57. self._handle_event()
  58. except:
  59. logging.exception("Exception retrieving log event.")
  60. def _handle_event(self):
  61. job = self.gearman_worker.getJob()
  62. try:
  63. arguments = json.loads(job.arguments.decode('utf-8'))
  64. source_url = arguments['source_url']
  65. retry = arguments['retry']
  66. event = arguments['event']
  67. logging.debug("Handling event: " + json.dumps(event))
  68. fields = event.get('fields') or event.get('@fields')
  69. if fields.pop('build_status') != 'ABORTED':
  70. # Handle events ignoring aborted builds. These builds are
  71. # discarded by zuul.
  72. subunit_io = self._retrieve_subunit_v2(source_url, retry)
  73. if not subunit_io:
  74. job.sendWorkException(
  75. 'Unable to retrieve subunit stream'.encode('utf8'))
  76. logging.debug("Pushing subunit files.")
  77. out_event = fields.copy()
  78. out_event["subunit"] = subunit_io
  79. self.subunitq.put(out_event)
  80. job.sendWorkComplete()
  81. except Exception as e:
  82. logging.exception("Exception handling log event.")
  83. job.sendWorkException(str(e).encode('utf-8'))
  84. def _retrieve_subunit_v2(self, source_url, retry):
  85. # TODO (clarkb): This should check the content type instead of file
  86. # extension for determining if gzip was used.
  87. gzipped = False
  88. raw_buf = b''
  89. try:
  90. gzipped, raw_buf = self._get_subunit_data(source_url, retry)
  91. except urllib2.HTTPError as e:
  92. if e.code == 404:
  93. logging.info("Unable to retrieve %s: HTTP error 404" %
  94. source_url)
  95. else:
  96. logging.exception("Unable to get log data.")
  97. return None
  98. except Exception:
  99. # Silently drop fatal errors when retrieving logs.
  100. # TODO (clarkb): Handle these errors.
  101. # Perhaps simply add a log message to raw_buf?
  102. logging.exception("Unable to get log data.")
  103. return None
  104. if gzipped:
  105. logging.debug("Decompressing gzipped source file.")
  106. raw_strIO = cStringIO.StringIO(raw_buf)
  107. f = gzip.GzipFile(fileobj=raw_strIO)
  108. buf = cStringIO.StringIO(f.read())
  109. raw_strIO.close()
  110. f.close()
  111. else:
  112. logging.debug("Decoding source file.")
  113. buf = cStringIO.StringIO(raw_buf)
  114. return buf
  115. def _get_subunit_data(self, source_url, retry):
  116. gzipped = False
  117. try:
  118. # TODO(clarkb): We really should be using requests instead
  119. # of urllib2. urllib2 will automatically perform a POST
  120. # instead of a GET if we provide urlencoded data to urlopen
  121. # but we need to do a GET. The parameters are currently
  122. # hardcoded so this should be ok for now.
  123. logging.debug("Retrieving: " + source_url + ".gz")
  124. req = urllib2.Request(source_url + ".gz")
  125. req.add_header('Accept-encoding', 'gzip')
  126. r = urllib2.urlopen(req)
  127. except urllib2.URLError:
  128. try:
  129. # Fallback on GETting unzipped data.
  130. logging.debug("Retrieving: " + source_url)
  131. r = urllib2.urlopen(source_url)
  132. except:
  133. logging.exception("Unable to retrieve source file.")
  134. raise
  135. except:
  136. logging.exception("Unable to retrieve source file.")
  137. raise
  138. if ('gzip' in r.info().get('Content-Type', '') or
  139. 'gzip' in r.info().get('Content-Encoding', '')):
  140. gzipped = True
  141. raw_buf = r.read()
  142. # Hack to read all of Jenkins console logs as they upload
  143. return gzipped, raw_buf
  144. class Subunit2SQLProcessor(object):
  145. def __init__(self, subunitq, subunit2sql_conf):
  146. self.subunitq = subunitq
  147. self.config = subunit2sql_conf
  148. # Initialize subunit2sql settings
  149. shell.cli_opts()
  150. extensions = shell.get_extensions()
  151. shell.parse_args([], [self.config])
  152. self.extra_targets = shell.get_targets(extensions)
  153. def handle_subunit_event(self):
  154. # Pull subunit event from queue and separate stream from metadata
  155. subunit = self.subunitq.get()
  156. subunit_v2 = subunit.pop('subunit')
  157. # Set run metadata from gearman
  158. log_url = subunit.pop('log_url', None)
  159. if log_url:
  160. log_dir = os.path.dirname(os.path.dirname(log_url))
  161. shell.CONF.set_override('artifacts', log_dir)
  162. shell.CONF.set_override('run_meta', subunit)
  163. # Parse subunit stream and store in DB
  164. logging.debug('Converting Subunit V2 stream to SQL')
  165. stream = read_subunit.ReadSubunit(subunit_v2,
  166. targets=self.extra_targets)
  167. shell.process_results(stream.get_results())
  168. subunit_v2.close()
  169. class Server(object):
  170. def __init__(self, config, debuglog):
  171. # Config init.
  172. self.config = config
  173. self.gearman_host = self.config['gearman-host']
  174. self.gearman_port = self.config['gearman-port']
  175. # Pythong logging output file.
  176. self.debuglog = debuglog
  177. self.retriever = None
  178. self.subunitqueue = Queue.Queue(131072)
  179. self.processor = None
  180. self.filter_factories = []
  181. def setup_logging(self):
  182. if self.debuglog:
  183. logging.basicConfig(format='%(asctime)s %(message)s',
  184. filename=self.debuglog, level=logging.DEBUG)
  185. else:
  186. # Prevent leakage into the logstash log stream.
  187. logging.basicConfig(level=logging.CRITICAL)
  188. logging.debug("Log pusher starting.")
  189. def setup_retriever(self):
  190. hostname = socket.gethostname()
  191. gearman_worker = gear.Worker(hostname + b'-pusher')
  192. gearman_worker.addServer(self.gearman_host,
  193. self.gearman_port)
  194. gearman_worker.registerFunction(b'push-subunit')
  195. self.retriever = SubunitRetriever(gearman_worker,
  196. self.filter_factories,
  197. self.subunitqueue)
  198. def setup_processor(self):
  199. # Note this processor will not work if the process is run as a
  200. # daemon. You must use the --foreground option.
  201. subunit2sql_config = self.config['config']
  202. self.processor = Subunit2SQLProcessor(self.subunitqueue,
  203. subunit2sql_config)
  204. def main(self):
  205. self.setup_retriever()
  206. self.setup_processor()
  207. self.retriever.daemon = True
  208. self.retriever.start()
  209. while True:
  210. try:
  211. self.processor.handle_subunit_event()
  212. except:
  213. logging.exception("Exception processing log event.")
  214. def main():
  215. parser = argparse.ArgumentParser()
  216. parser.add_argument("-c", "--config", required=True,
  217. help="Path to yaml config file.")
  218. parser.add_argument("-d", "--debuglog",
  219. help="Enable debug log. "
  220. "Specifies file to write log to.")
  221. parser.add_argument("--foreground", action='store_true',
  222. help="Run in the foreground.")
  223. parser.add_argument("-p", "--pidfile",
  224. default="/var/run/jenkins-subunit-pusher/"
  225. "jenkins-subunit-gearman-worker.pid",
  226. help="PID file to lock during daemonization.")
  227. args = parser.parse_args()
  228. with open(args.config, 'r') as config_stream:
  229. config = yaml.load(config_stream)
  230. server = Server(config, args.debuglog)
  231. if args.foreground:
  232. server.setup_logging()
  233. server.main()
  234. else:
  235. pidfile = pidfile_mod.TimeoutPIDLockFile(args.pidfile, 10)
  236. with daemon.DaemonContext(pidfile=pidfile):
  237. server.setup_logging()
  238. server.main()
  239. if __name__ == '__main__':
  240. main()