The Gatekeeper, or a project gating system
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

server.py 105KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637
  1. # Copyright 2014 OpenStack Foundation
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  4. # not use this file except in compliance with the License. You may obtain
  5. # a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. # License for the specific language governing permissions and limitations
  13. # under the License.
  14. import collections
  15. import datetime
  16. import json
  17. import logging
  18. import os
  19. import shutil
  20. import signal
  21. import shlex
  22. import socket
  23. import subprocess
  24. import tempfile
  25. import threading
  26. import time
  27. import traceback
  28. import git
  29. from urllib.parse import urlsplit
  30. from zuul.lib.yamlutil import yaml
  31. from zuul.lib.config import get_default
  32. from zuul.lib.statsd import get_statsd
  33. from zuul.lib import filecomments
  34. try:
  35. import ara.plugins.callbacks as ara_callbacks
  36. except ImportError:
  37. ara_callbacks = None
  38. import gear
  39. import zuul.merger.merger
  40. import zuul.ansible.logconfig
  41. from zuul.executor.sensors.cpu import CPUSensor
  42. from zuul.executor.sensors.hdd import HDDSensor
  43. from zuul.executor.sensors.pause import PauseSensor
  44. from zuul.executor.sensors.startingbuilds import StartingBuildsSensor
  45. from zuul.executor.sensors.ram import RAMSensor
  46. from zuul.lib import commandsocket
  47. BUFFER_LINES_FOR_SYNTAX = 200
  48. COMMANDS = ['stop', 'pause', 'unpause', 'graceful', 'verbose',
  49. 'unverbose', 'keep', 'nokeep']
  50. DEFAULT_FINGER_PORT = 7900
  51. BLACKLISTED_ANSIBLE_CONNECTION_TYPES = [
  52. 'network_cli', 'kubectl', 'project', 'namespace']
  53. class StopException(Exception):
  54. """An exception raised when an inner loop is asked to stop."""
  55. pass
  56. class ExecutorError(Exception):
  57. """A non-transient run-time executor error
  58. This class represents error conditions detected by the executor
  59. when preparing to run a job which we know are consistently fatal.
  60. Zuul should not reschedule the build in these cases.
  61. """
  62. pass
  63. class RoleNotFoundError(ExecutorError):
  64. pass
  65. class DiskAccountant(object):
  66. ''' A single thread to periodically run du and monitor a base directory
  67. Whenever the accountant notices a dir over limit, it will call the
  68. given func with an argument of the job directory. That function
  69. should be used to remediate the problem, generally by killing the
  70. job producing the disk bloat). The function will be called every
  71. time the problem is noticed, so it should be handled synchronously
  72. to avoid stacking up calls.
  73. '''
  74. log = logging.getLogger("zuul.ExecutorDiskAccountant")
  75. def __init__(self, jobs_base, limit, func, cache_dir, usage_func=None):
  76. '''
  77. :param str jobs_base: absolute path name of dir to be monitored
  78. :param int limit: maximum number of MB allowed to be in use in any one
  79. subdir
  80. :param callable func: Function to call with overlimit dirs
  81. :param str cache_dir: absolute path name of dir to be passed as the
  82. first argument to du. This will ensure du does
  83. not count any hardlinks to files in this
  84. directory against a single job.
  85. :param callable usage_func: Optional function to call with usage
  86. for every dir _NOT_ over limit
  87. '''
  88. # Don't cross the streams
  89. if cache_dir == jobs_base:
  90. raise Exception("Cache dir and jobs dir cannot be the same")
  91. self.thread = threading.Thread(target=self._run,
  92. name='diskaccountant')
  93. self.thread.daemon = True
  94. self._running = False
  95. self.jobs_base = jobs_base
  96. self.limit = limit
  97. self.func = func
  98. self.cache_dir = cache_dir
  99. self.usage_func = usage_func
  100. self.stop_event = threading.Event()
  101. def _run(self):
  102. while self._running:
  103. # Walk job base
  104. before = time.time()
  105. du = subprocess.Popen(
  106. ['du', '-m', '--max-depth=1', self.cache_dir, self.jobs_base],
  107. stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
  108. for line in du.stdout:
  109. (size, dirname) = line.rstrip().split()
  110. dirname = dirname.decode('utf8')
  111. if dirname == self.jobs_base or dirname == self.cache_dir:
  112. continue
  113. if os.path.dirname(dirname) == self.cache_dir:
  114. continue
  115. size = int(size)
  116. if size > self.limit:
  117. self.log.info(
  118. "{job} is using {size}MB (limit={limit})"
  119. .format(size=size, job=dirname, limit=self.limit))
  120. self.func(dirname)
  121. elif self.usage_func:
  122. self.log.debug(
  123. "{job} is using {size}MB (limit={limit})"
  124. .format(size=size, job=dirname, limit=self.limit))
  125. self.usage_func(dirname, size)
  126. du.wait()
  127. after = time.time()
  128. # Sleep half as long as that took, or 1s, whichever is longer
  129. delay_time = max((after - before) / 2, 1.0)
  130. self.stop_event.wait(delay_time)
  131. def start(self):
  132. self._running = True
  133. self.thread.start()
  134. def stop(self):
  135. self._running = False
  136. self.stop_event.set()
  137. # We join here to avoid whitelisting the thread -- if it takes more
  138. # than 5s to stop in tests, there's a problem.
  139. self.thread.join(timeout=5)
  140. class Watchdog(object):
  141. def __init__(self, timeout, function, args):
  142. self.timeout = timeout
  143. self.function = function
  144. self.args = args
  145. self.thread = threading.Thread(target=self._run,
  146. name='watchdog')
  147. self.thread.daemon = True
  148. self.timed_out = None
  149. def _run(self):
  150. while self._running and time.time() < self.end:
  151. time.sleep(10)
  152. if self._running:
  153. self.timed_out = True
  154. self.function(*self.args)
  155. else:
  156. # Only set timed_out to false if we aren't _running
  157. # anymore. This means that we stopped running not because
  158. # of a timeout but because normal execution ended.
  159. self.timed_out = False
  160. def start(self):
  161. self._running = True
  162. self.end = time.time() + self.timeout
  163. self.thread.start()
  164. def stop(self):
  165. self._running = False
  166. class SshAgent(object):
  167. log = logging.getLogger("zuul.ExecutorServer")
  168. def __init__(self):
  169. self.env = {}
  170. self.ssh_agent = None
  171. def start(self):
  172. if self.ssh_agent:
  173. return
  174. with open('/dev/null', 'r+') as devnull:
  175. ssh_agent = subprocess.Popen(['ssh-agent'], close_fds=True,
  176. stdout=subprocess.PIPE,
  177. stderr=devnull,
  178. stdin=devnull)
  179. (output, _) = ssh_agent.communicate()
  180. output = output.decode('utf8')
  181. for line in output.split("\n"):
  182. if '=' in line:
  183. line = line.split(";", 1)[0]
  184. (key, value) = line.split('=')
  185. self.env[key] = value
  186. self.log.info('Started SSH Agent, {}'.format(self.env))
  187. def stop(self):
  188. if 'SSH_AGENT_PID' in self.env:
  189. try:
  190. os.kill(int(self.env['SSH_AGENT_PID']), signal.SIGTERM)
  191. except OSError:
  192. self.log.exception(
  193. 'Problem sending SIGTERM to agent {}'.format(self.env))
  194. self.log.debug('Sent SIGTERM to SSH Agent, {}'.format(self.env))
  195. self.env = {}
  196. def add(self, key_path):
  197. env = os.environ.copy()
  198. env.update(self.env)
  199. key_path = os.path.expanduser(key_path)
  200. self.log.debug('Adding SSH Key {}'.format(key_path))
  201. try:
  202. subprocess.check_output(['ssh-add', key_path], env=env,
  203. stderr=subprocess.PIPE)
  204. except subprocess.CalledProcessError as e:
  205. self.log.exception('ssh-add failed. stdout: %s, stderr: %s',
  206. e.output, e.stderr)
  207. raise
  208. self.log.info('Added SSH Key {}'.format(key_path))
  209. def addData(self, name, key_data):
  210. env = os.environ.copy()
  211. env.update(self.env)
  212. self.log.debug('Adding SSH Key {}'.format(name))
  213. try:
  214. subprocess.check_output(['ssh-add', '-'], env=env,
  215. input=key_data.encode('utf8'),
  216. stderr=subprocess.PIPE)
  217. except subprocess.CalledProcessError as e:
  218. self.log.exception('ssh-add failed. stdout: %s, stderr: %s',
  219. e.output, e.stderr)
  220. raise
  221. self.log.info('Added SSH Key {}'.format(name))
  222. def remove(self, key_path):
  223. env = os.environ.copy()
  224. env.update(self.env)
  225. key_path = os.path.expanduser(key_path)
  226. self.log.debug('Removing SSH Key {}'.format(key_path))
  227. subprocess.check_output(['ssh-add', '-d', key_path], env=env,
  228. stderr=subprocess.PIPE)
  229. self.log.info('Removed SSH Key {}'.format(key_path))
  230. def list(self):
  231. if 'SSH_AUTH_SOCK' not in self.env:
  232. return None
  233. env = os.environ.copy()
  234. env.update(self.env)
  235. result = []
  236. for line in subprocess.Popen(['ssh-add', '-L'], env=env,
  237. stdout=subprocess.PIPE).stdout:
  238. line = line.decode('utf8')
  239. if line.strip() == 'The agent has no identities.':
  240. break
  241. result.append(line.strip())
  242. return result
  243. class JobDirPlaybook(object):
  244. def __init__(self, root):
  245. self.root = root
  246. self.trusted = None
  247. self.project_canonical_name = None
  248. self.branch = None
  249. self.canonical_name_and_path = None
  250. self.path = None
  251. self.roles = []
  252. self.roles_path = []
  253. self.ansible_config = os.path.join(self.root, 'ansible.cfg')
  254. self.project_link = os.path.join(self.root, 'project')
  255. self.secrets_root = os.path.join(self.root, 'secrets')
  256. os.makedirs(self.secrets_root)
  257. self.secrets = os.path.join(self.secrets_root, 'secrets.yaml')
  258. self.secrets_content = None
  259. def addRole(self):
  260. count = len(self.roles)
  261. root = os.path.join(self.root, 'role_%i' % (count,))
  262. os.makedirs(root)
  263. self.roles.append(root)
  264. return root
  265. class JobDir(object):
  266. def __init__(self, root, keep, build_uuid):
  267. '''
  268. :param str root: Root directory for the individual job directories.
  269. Can be None to use the default system temp root directory.
  270. :param bool keep: If True, do not delete the job directory.
  271. :param str build_uuid: The unique build UUID. If supplied, this will
  272. be used as the temp job directory name. Using this will help the
  273. log streaming daemon find job logs.
  274. '''
  275. # root
  276. # ansible (mounted in bwrap read-only)
  277. # logging.json
  278. # inventory.yaml
  279. # extra_vars.yaml
  280. # .ansible (mounted in bwrap read-write)
  281. # fact-cache/localhost
  282. # cp
  283. # playbook_0 (mounted in bwrap for each playbook read-only)
  284. # secrets.yaml
  285. # project -> ../trusted/project_0/...
  286. # role_0 -> ../trusted/project_0/...
  287. # trusted (mounted in bwrap read-only)
  288. # project_0
  289. # <git.example.com>
  290. # <project>
  291. # untrusted (mounted in bwrap read-only)
  292. # project_0
  293. # <git.example.com>
  294. # <project>
  295. # work (mounted in bwrap read-write)
  296. # .ssh
  297. # known_hosts
  298. # src
  299. # <git.example.com>
  300. # <project>
  301. # logs
  302. # job-output.txt
  303. # tmp
  304. # results.json
  305. self.keep = keep
  306. if root:
  307. tmpdir = root
  308. else:
  309. tmpdir = tempfile.gettempdir()
  310. self.root = os.path.join(tmpdir, build_uuid)
  311. os.mkdir(self.root, 0o700)
  312. self.work_root = os.path.join(self.root, 'work')
  313. os.makedirs(self.work_root)
  314. self.src_root = os.path.join(self.work_root, 'src')
  315. os.makedirs(self.src_root)
  316. self.log_root = os.path.join(self.work_root, 'logs')
  317. os.makedirs(self.log_root)
  318. # Create local tmp directory
  319. # NOTE(tobiash): This must live within the work root as it can be used
  320. # by ansible for temporary files which are path checked in untrusted
  321. # jobs.
  322. self.local_tmp = os.path.join(self.work_root, 'tmp')
  323. os.makedirs(self.local_tmp)
  324. self.ansible_root = os.path.join(self.root, 'ansible')
  325. os.makedirs(self.ansible_root)
  326. self.trusted_root = os.path.join(self.root, 'trusted')
  327. os.makedirs(self.trusted_root)
  328. self.untrusted_root = os.path.join(self.root, 'untrusted')
  329. os.makedirs(self.untrusted_root)
  330. ssh_dir = os.path.join(self.work_root, '.ssh')
  331. os.mkdir(ssh_dir, 0o700)
  332. # Create ansible cache directory
  333. self.ansible_cache_root = os.path.join(self.root, '.ansible')
  334. self.fact_cache = os.path.join(self.ansible_cache_root, 'fact-cache')
  335. os.makedirs(self.fact_cache)
  336. self.control_path = os.path.join(self.ansible_cache_root, 'cp')
  337. self.job_unreachable_file = os.path.join(self.ansible_cache_root,
  338. 'nodes.unreachable')
  339. os.makedirs(self.control_path)
  340. localhost_facts = os.path.join(self.fact_cache, 'localhost')
  341. # NOTE(pabelanger): We do not want to leak zuul-executor facts to other
  342. # playbooks now that smart fact gathering is enabled by default. We
  343. # can have ansible skip populating the cache with information by the
  344. # doing the following.
  345. with open(localhost_facts, 'w') as f:
  346. f.write('{"module_setup": true}')
  347. self.result_data_file = os.path.join(self.work_root, 'results.json')
  348. with open(self.result_data_file, 'w'):
  349. pass
  350. self.known_hosts = os.path.join(ssh_dir, 'known_hosts')
  351. self.inventory = os.path.join(self.ansible_root, 'inventory.yaml')
  352. self.extra_vars = os.path.join(self.ansible_root, 'extra_vars.yaml')
  353. self.setup_inventory = os.path.join(self.ansible_root,
  354. 'setup-inventory.yaml')
  355. self.logging_json = os.path.join(self.ansible_root, 'logging.json')
  356. self.playbooks = [] # The list of candidate playbooks
  357. self.playbook = None # A pointer to the candidate we have chosen
  358. self.pre_playbooks = []
  359. self.post_playbooks = []
  360. self.job_output_file = os.path.join(self.log_root, 'job-output.txt')
  361. # We need to create the job-output.txt upfront in order to close the
  362. # gap between url reporting and ansible creating the file. Otherwise
  363. # there is a period of time where the user can click on the live log
  364. # link on the status page but the log streaming fails because the file
  365. # is not there yet.
  366. with open(self.job_output_file, 'w') as job_output:
  367. job_output.write("{now} | Job console starting...\n".format(
  368. now=datetime.datetime.now()
  369. ))
  370. self.trusted_projects = []
  371. self.trusted_project_index = {}
  372. self.untrusted_projects = []
  373. self.untrusted_project_index = {}
  374. # Create a JobDirPlaybook for the Ansible setup run. This
  375. # doesn't use an actual playbook, but it lets us use the same
  376. # methods to write an ansible.cfg as the rest of the Ansible
  377. # runs.
  378. setup_root = os.path.join(self.ansible_root, 'setup_playbook')
  379. os.makedirs(setup_root)
  380. self.setup_playbook = JobDirPlaybook(setup_root)
  381. self.setup_playbook.trusted = True
  382. def addTrustedProject(self, canonical_name, branch):
  383. # Trusted projects are placed in their own directories so that
  384. # we can support using different branches of the same project
  385. # in different playbooks.
  386. count = len(self.trusted_projects)
  387. root = os.path.join(self.trusted_root, 'project_%i' % (count,))
  388. os.makedirs(root)
  389. self.trusted_projects.append(root)
  390. self.trusted_project_index[(canonical_name, branch)] = root
  391. return root
  392. def getTrustedProject(self, canonical_name, branch):
  393. return self.trusted_project_index.get((canonical_name, branch))
  394. def addUntrustedProject(self, canonical_name, branch):
  395. # Similar to trusted projects, but these hold checkouts of
  396. # projects which are allowed to have speculative changes
  397. # applied. They might, however, be different branches than
  398. # what is used in the working dir, so they need their own
  399. # location. Moreover, we might avoid mischief if a job alters
  400. # the contents of the working dir.
  401. count = len(self.untrusted_projects)
  402. root = os.path.join(self.untrusted_root, 'project_%i' % (count,))
  403. os.makedirs(root)
  404. self.untrusted_projects.append(root)
  405. self.untrusted_project_index[(canonical_name, branch)] = root
  406. return root
  407. def getUntrustedProject(self, canonical_name, branch):
  408. return self.untrusted_project_index.get((canonical_name, branch))
  409. def addPrePlaybook(self):
  410. count = len(self.pre_playbooks)
  411. root = os.path.join(self.ansible_root, 'pre_playbook_%i' % (count,))
  412. os.makedirs(root)
  413. playbook = JobDirPlaybook(root)
  414. self.pre_playbooks.append(playbook)
  415. return playbook
  416. def addPostPlaybook(self):
  417. count = len(self.post_playbooks)
  418. root = os.path.join(self.ansible_root, 'post_playbook_%i' % (count,))
  419. os.makedirs(root)
  420. playbook = JobDirPlaybook(root)
  421. self.post_playbooks.append(playbook)
  422. return playbook
  423. def addPlaybook(self):
  424. count = len(self.playbooks)
  425. root = os.path.join(self.ansible_root, 'playbook_%i' % (count,))
  426. os.makedirs(root)
  427. playbook = JobDirPlaybook(root)
  428. self.playbooks.append(playbook)
  429. return playbook
  430. def cleanup(self):
  431. if not self.keep:
  432. shutil.rmtree(self.root)
  433. def __enter__(self):
  434. return self
  435. def __exit__(self, etype, value, tb):
  436. self.cleanup()
  437. class UpdateTask(object):
  438. def __init__(self, connection_name, project_name):
  439. self.connection_name = connection_name
  440. self.project_name = project_name
  441. self.canonical_name = None
  442. self.branches = None
  443. self.refs = None
  444. self.event = threading.Event()
  445. def __eq__(self, other):
  446. if (other and other.connection_name == self.connection_name and
  447. other.project_name == self.project_name):
  448. return True
  449. return False
  450. def wait(self):
  451. self.event.wait()
  452. def setComplete(self):
  453. self.event.set()
  454. class DeduplicateQueue(object):
  455. def __init__(self):
  456. self.queue = collections.deque()
  457. self.condition = threading.Condition()
  458. def qsize(self):
  459. return len(self.queue)
  460. def put(self, item):
  461. # Returns the original item if added, or an equivalent item if
  462. # already enqueued.
  463. self.condition.acquire()
  464. ret = None
  465. try:
  466. for x in self.queue:
  467. if item == x:
  468. ret = x
  469. if ret is None:
  470. ret = item
  471. self.queue.append(item)
  472. self.condition.notify()
  473. finally:
  474. self.condition.release()
  475. return ret
  476. def get(self):
  477. self.condition.acquire()
  478. try:
  479. while True:
  480. try:
  481. ret = self.queue.popleft()
  482. return ret
  483. except IndexError:
  484. pass
  485. self.condition.wait()
  486. finally:
  487. self.condition.release()
  488. def _copy_ansible_files(python_module, target_dir):
  489. library_path = os.path.dirname(os.path.abspath(python_module.__file__))
  490. for fn in os.listdir(library_path):
  491. if fn == "__pycache__":
  492. continue
  493. full_path = os.path.join(library_path, fn)
  494. if os.path.isdir(full_path):
  495. shutil.copytree(full_path, os.path.join(target_dir, fn))
  496. else:
  497. shutil.copy(os.path.join(library_path, fn), target_dir)
  498. def check_varnames(var):
  499. # We block these in configloader, but block it here too to make
  500. # sure that a job doesn't pass variables named zuul or nodepool.
  501. if 'zuul' in var:
  502. raise Exception("Defining variables named 'zuul' is not allowed")
  503. if 'nodepool' in var:
  504. raise Exception("Defining variables named 'nodepool' is not allowed")
  505. def make_setup_inventory_dict(nodes):
  506. hosts = {}
  507. for node in nodes:
  508. if (node['host_vars']['ansible_connection'] in
  509. BLACKLISTED_ANSIBLE_CONNECTION_TYPES):
  510. continue
  511. hosts[node['name']] = node['host_vars']
  512. inventory = {
  513. 'all': {
  514. 'hosts': hosts,
  515. }
  516. }
  517. return inventory
  518. def make_inventory_dict(nodes, args, all_vars):
  519. hosts = {}
  520. for node in nodes:
  521. hosts[node['name']] = node['host_vars']
  522. inventory = {
  523. 'all': {
  524. 'hosts': hosts,
  525. 'vars': all_vars,
  526. }
  527. }
  528. for group in args['groups']:
  529. if 'children' not in inventory['all']:
  530. inventory['all']['children'] = dict()
  531. group_hosts = {}
  532. for node_name in group['nodes']:
  533. group_hosts[node_name] = None
  534. group_vars = args['group_vars'].get(group['name'], {}).copy()
  535. check_varnames(group_vars)
  536. inventory['all']['children'].update({
  537. group['name']: {
  538. 'hosts': group_hosts,
  539. 'vars': group_vars,
  540. }})
  541. return inventory
  542. class AnsibleJobLogAdapter(logging.LoggerAdapter):
  543. def process(self, msg, kwargs):
  544. msg, kwargs = super(AnsibleJobLogAdapter, self).process(msg, kwargs)
  545. msg = '[build: %s] %s' % (kwargs['extra']['job'], msg)
  546. return msg, kwargs
  547. class AnsibleJob(object):
  548. RESULT_NORMAL = 1
  549. RESULT_TIMED_OUT = 2
  550. RESULT_UNREACHABLE = 3
  551. RESULT_ABORTED = 4
  552. RESULT_DISK_FULL = 5
  553. RESULT_MAP = {
  554. RESULT_NORMAL: 'RESULT_NORMAL',
  555. RESULT_TIMED_OUT: 'RESULT_TIMED_OUT',
  556. RESULT_UNREACHABLE: 'RESULT_UNREACHABLE',
  557. RESULT_ABORTED: 'RESULT_ABORTED',
  558. RESULT_DISK_FULL: 'RESULT_DISK_FULL',
  559. }
  560. def __init__(self, executor_server, job):
  561. logger = logging.getLogger("zuul.AnsibleJob")
  562. self.log = AnsibleJobLogAdapter(logger, {'job': job.unique})
  563. self.executor_server = executor_server
  564. self.job = job
  565. self.arguments = json.loads(job.arguments)
  566. self.jobdir = None
  567. self.proc = None
  568. self.proc_lock = threading.Lock()
  569. self.running = False
  570. self.started = False # Whether playbooks have started running
  571. self.paused = False
  572. self.aborted = False
  573. self.aborted_reason = None
  574. self._resume_event = threading.Event()
  575. self.thread = None
  576. self.project_info = {}
  577. self.private_key_file = get_default(self.executor_server.config,
  578. 'executor', 'private_key_file',
  579. '~/.ssh/id_rsa')
  580. self.winrm_key_file = get_default(self.executor_server.config,
  581. 'executor', 'winrm_cert_key_file',
  582. '~/.winrm/winrm_client_cert.key')
  583. self.winrm_pem_file = get_default(self.executor_server.config,
  584. 'executor', 'winrm_cert_pem_file',
  585. '~/.winrm/winrm_client_cert.pem')
  586. self.winrm_operation_timeout = get_default(
  587. self.executor_server.config,
  588. 'executor',
  589. 'winrm_operation_timeout_sec')
  590. self.winrm_read_timeout = get_default(
  591. self.executor_server.config,
  592. 'executor',
  593. 'winrm_read_timeout_sec')
  594. self.ssh_agent = SshAgent()
  595. self.executor_variables_file = None
  596. self.cpu_times = {'user': 0, 'system': 0,
  597. 'children_user': 0, 'children_system': 0}
  598. if self.executor_server.config.has_option('executor', 'variables'):
  599. self.executor_variables_file = self.executor_server.config.get(
  600. 'executor', 'variables')
  601. def run(self):
  602. self.running = True
  603. self.thread = threading.Thread(target=self.execute,
  604. name='build-%s' % self.job.unique)
  605. self.thread.start()
  606. def stop(self, reason=None):
  607. self.aborted = True
  608. self.aborted_reason = reason
  609. # if paused we need to resume the job so it can be stopped
  610. self.resume()
  611. self.abortRunningProc()
  612. def pause(self):
  613. self.log.info(
  614. "Pausing job %s for ref %s (change %s)" % (
  615. self.arguments['zuul']['job'],
  616. self.arguments['zuul']['ref'],
  617. self.arguments['zuul']['change_url']))
  618. with open(self.jobdir.job_output_file, 'a') as job_output:
  619. job_output.write(
  620. "{now} |\n"
  621. "{now} | Job paused\n".format(now=datetime.datetime.now()))
  622. self.paused = True
  623. data = {'paused': self.paused, 'data': self.getResultData()}
  624. self.job.sendWorkData(json.dumps(data))
  625. self._resume_event.wait()
  626. def resume(self):
  627. if not self.paused:
  628. return
  629. self.log.info(
  630. "Resuming job %s for ref %s (change %s)" % (
  631. self.arguments['zuul']['job'],
  632. self.arguments['zuul']['ref'],
  633. self.arguments['zuul']['change_url']))
  634. with open(self.jobdir.job_output_file, 'a') as job_output:
  635. job_output.write(
  636. "{now} | Job resumed\n"
  637. "{now} |\n".format(now=datetime.datetime.now()))
  638. self.paused = False
  639. self._resume_event.set()
  640. def wait(self):
  641. if self.thread:
  642. self.thread.join()
  643. def execute(self):
  644. try:
  645. self.ssh_agent.start()
  646. self.ssh_agent.add(self.private_key_file)
  647. for key in self.arguments.get('ssh_keys', []):
  648. self.ssh_agent.addData(key['name'], key['key'])
  649. self.jobdir = JobDir(self.executor_server.jobdir_root,
  650. self.executor_server.keep_jobdir,
  651. str(self.job.unique))
  652. self._execute()
  653. except ExecutorError as e:
  654. result_data = json.dumps(dict(result='ERROR',
  655. error_detail=e.args[0]))
  656. self.log.debug("Sending result: %s" % (result_data,))
  657. self.job.sendWorkComplete(result_data)
  658. except Exception:
  659. self.log.exception("Exception while executing job")
  660. self.job.sendWorkException(traceback.format_exc())
  661. finally:
  662. self.running = False
  663. if self.jobdir:
  664. try:
  665. self.jobdir.cleanup()
  666. except Exception:
  667. self.log.exception("Error cleaning up jobdir:")
  668. if self.ssh_agent:
  669. try:
  670. self.ssh_agent.stop()
  671. except Exception:
  672. self.log.exception("Error stopping SSH agent:")
  673. try:
  674. self.executor_server.finishJob(self.job.unique)
  675. except Exception:
  676. self.log.exception("Error finalizing job thread:")
  677. def _execute(self):
  678. args = self.arguments
  679. self.log.info(
  680. "Beginning job %s for ref %s (change %s)" % (
  681. args['zuul']['job'],
  682. args['zuul']['ref'],
  683. args['zuul']['change_url']))
  684. self.log.debug("Job root: %s" % (self.jobdir.root,))
  685. tasks = []
  686. projects = set()
  687. # Make sure all projects used by the job are updated...
  688. for project in args['projects']:
  689. self.log.debug("Updating project %s" % (project,))
  690. tasks.append(self.executor_server.update(
  691. project['connection'], project['name']))
  692. projects.add((project['connection'], project['name']))
  693. # ...as well as all playbook and role projects.
  694. repos = []
  695. playbooks = (args['pre_playbooks'] + args['playbooks'] +
  696. args['post_playbooks'])
  697. for playbook in playbooks:
  698. repos.append(playbook)
  699. repos += playbook['roles']
  700. for repo in repos:
  701. self.log.debug("Updating playbook or role %s" % (repo['project'],))
  702. key = (repo['connection'], repo['project'])
  703. if key not in projects:
  704. tasks.append(self.executor_server.update(*key))
  705. projects.add(key)
  706. for task in tasks:
  707. task.wait()
  708. self.project_info[task.canonical_name] = {
  709. 'refs': task.refs,
  710. 'branches': task.branches,
  711. }
  712. self.log.debug("Git updates complete")
  713. merger = self.executor_server._getMerger(
  714. self.jobdir.src_root,
  715. self.executor_server.merge_root,
  716. self.log)
  717. repos = {}
  718. for project in args['projects']:
  719. self.log.debug("Cloning %s/%s" % (project['connection'],
  720. project['name'],))
  721. repo = merger.getRepo(project['connection'],
  722. project['name'])
  723. repos[project['canonical_name']] = repo
  724. # The commit ID of the original item (before merging). Used
  725. # later for line mapping.
  726. item_commit = None
  727. merge_items = [i for i in args['items'] if i.get('number')]
  728. if merge_items:
  729. item_commit = self.doMergeChanges(merger, merge_items,
  730. args['repo_state'])
  731. if item_commit is None:
  732. # There was a merge conflict and we have already sent
  733. # a work complete result, don't run any jobs
  734. return
  735. state_items = [i for i in args['items'] if not i.get('number')]
  736. if state_items:
  737. merger.setRepoState(state_items, args['repo_state'])
  738. for project in args['projects']:
  739. repo = repos[project['canonical_name']]
  740. # If this project is the Zuul project and this is a ref
  741. # rather than a change, checkout the ref.
  742. if (project['canonical_name'] ==
  743. args['zuul']['project']['canonical_name'] and
  744. (not args['zuul'].get('branch')) and
  745. args['zuul'].get('ref')):
  746. ref = args['zuul']['ref']
  747. else:
  748. ref = None
  749. selected_ref, selected_desc = self.resolveBranch(
  750. project['canonical_name'],
  751. ref,
  752. args['branch'],
  753. args['override_branch'],
  754. args['override_checkout'],
  755. project['override_branch'],
  756. project['override_checkout'],
  757. project['default_branch'])
  758. self.log.info("Checking out %s %s %s",
  759. project['canonical_name'], selected_desc,
  760. selected_ref)
  761. repo.checkout(selected_ref)
  762. # Update the inventory variables to indicate the ref we
  763. # checked out
  764. p = args['zuul']['projects'][project['canonical_name']]
  765. p['checkout'] = selected_ref
  766. # Set the URL of the origin remote for each repo to a bogus
  767. # value. Keeping the remote allows tools to use it to determine
  768. # which commits are part of the current change.
  769. for repo in repos.values():
  770. repo.setRemoteUrl('file:///dev/null')
  771. # This prepares each playbook and the roles needed for each.
  772. self.preparePlaybooks(args)
  773. self.prepareAnsibleFiles(args)
  774. self.writeLoggingConfig()
  775. data = {
  776. # TODO(mordred) worker_name is needed as a unique name for the
  777. # client to use for cancelling jobs on an executor. It's defaulting
  778. # to the hostname for now, but in the future we should allow
  779. # setting a per-executor override so that one can run more than
  780. # one executor on a host.
  781. 'worker_name': self.executor_server.hostname,
  782. 'worker_hostname': self.executor_server.hostname,
  783. 'worker_log_port': self.executor_server.log_streaming_port
  784. }
  785. if self.executor_server.log_streaming_port != DEFAULT_FINGER_PORT:
  786. data['url'] = "finger://{hostname}:{port}/{uuid}".format(
  787. hostname=data['worker_hostname'],
  788. port=data['worker_log_port'],
  789. uuid=self.job.unique)
  790. else:
  791. data['url'] = 'finger://{hostname}/{uuid}'.format(
  792. hostname=data['worker_hostname'],
  793. uuid=self.job.unique)
  794. self.job.sendWorkData(json.dumps(data))
  795. self.job.sendWorkStatus(0, 100)
  796. result = self.runPlaybooks(args)
  797. # Stop the persistent SSH connections.
  798. setup_status, setup_code = self.runAnsibleCleanup(
  799. self.jobdir.setup_playbook)
  800. if self.aborted_reason == self.RESULT_DISK_FULL:
  801. result = 'DISK_FULL'
  802. data = self.getResultData()
  803. warnings = []
  804. self.mapLines(merger, args, data, item_commit, warnings)
  805. result_data = json.dumps(dict(result=result,
  806. warnings=warnings,
  807. data=data))
  808. self.log.debug("Sending result: %s" % (result_data,))
  809. self.job.sendWorkComplete(result_data)
  810. def getResultData(self):
  811. data = {}
  812. try:
  813. with open(self.jobdir.result_data_file) as f:
  814. file_data = f.read()
  815. if file_data:
  816. data = json.loads(file_data)
  817. except Exception:
  818. self.log.exception("Unable to load result data:")
  819. return data
  820. def mapLines(self, merger, args, data, commit, warnings):
  821. # The data and warnings arguments are mutated in this method.
  822. # If we received file comments, map the line numbers before
  823. # we send the result.
  824. fc = data.get('zuul', {}).get('file_comments')
  825. if not fc:
  826. return
  827. disable = data.get('zuul', {}).get('disable_file_comment_line_mapping')
  828. if disable:
  829. return
  830. try:
  831. filecomments.validate(fc)
  832. except Exception as e:
  833. warnings.append("Job %s: validation error in file comments: %s" %
  834. (args['zuul']['job'], str(e)))
  835. del data['zuul']['file_comments']
  836. return
  837. repo = None
  838. for project in args['projects']:
  839. if (project['canonical_name'] !=
  840. args['zuul']['project']['canonical_name']):
  841. continue
  842. repo = merger.getRepo(project['connection'],
  843. project['name'])
  844. # If the repo doesn't exist, abort
  845. if not repo:
  846. return
  847. # Check out the selected ref again in case the job altered the
  848. # repo state.
  849. p = args['zuul']['projects'][project['canonical_name']]
  850. selected_ref = p['checkout']
  851. self.log.info("Checking out %s %s for line mapping",
  852. project['canonical_name'], selected_ref)
  853. try:
  854. repo.checkout(selected_ref)
  855. except Exception:
  856. # If checkout fails, abort
  857. self.log.exception("Error checking out repo for line mapping")
  858. warnings.append("Job %s: unable to check out repo "
  859. "for file comments" % (args['zuul']['job']))
  860. return
  861. lines = filecomments.extractLines(fc)
  862. new_lines = {}
  863. for (filename, lineno) in lines:
  864. try:
  865. new_lineno = repo.mapLine(commit, filename, lineno)
  866. except Exception as e:
  867. # Log at debug level since it's likely a job issue
  868. self.log.debug("Error mapping line:", exc_info=True)
  869. if isinstance(e, git.GitCommandError):
  870. msg = e.stderr
  871. else:
  872. msg = str(e)
  873. warnings.append("Job %s: unable to map line "
  874. "for file comments: %s" %
  875. (args['zuul']['job'], msg))
  876. new_lineno = None
  877. if new_lineno is not None:
  878. new_lines[(filename, lineno)] = new_lineno
  879. filecomments.updateLines(fc, new_lines)
  880. def doMergeChanges(self, merger, items, repo_state):
  881. try:
  882. ret = merger.mergeChanges(items, repo_state=repo_state)
  883. except ValueError:
  884. # Return ABORTED so that we'll try again. At this point all of
  885. # the refs we're trying to merge should be valid refs. If we
  886. # can't fetch them, it should resolve itself.
  887. self.log.exception("Could not fetch refs to merge from remote")
  888. result = dict(result='ABORTED')
  889. self.job.sendWorkComplete(json.dumps(result))
  890. return None
  891. if not ret: # merge conflict
  892. result = dict(result='MERGER_FAILURE')
  893. if self.executor_server.statsd:
  894. base_key = "zuul.executor.{hostname}.merger"
  895. self.executor_server.statsd.incr(base_key + ".FAILURE")
  896. self.job.sendWorkComplete(json.dumps(result))
  897. return None
  898. if self.executor_server.statsd:
  899. base_key = "zuul.executor.{hostname}.merger"
  900. self.executor_server.statsd.incr(base_key + ".SUCCESS")
  901. recent = ret[3]
  902. orig_commit = ret[4]
  903. for key, commit in recent.items():
  904. (connection, project, branch) = key
  905. repo = merger.getRepo(connection, project)
  906. repo.setRef('refs/heads/' + branch, commit)
  907. return orig_commit
  908. def resolveBranch(self, project_canonical_name, ref, zuul_branch,
  909. job_override_branch, job_override_checkout,
  910. project_override_branch, project_override_checkout,
  911. project_default_branch):
  912. branches = self.project_info[project_canonical_name]['branches']
  913. refs = self.project_info[project_canonical_name]['refs']
  914. selected_ref = None
  915. selected_desc = None
  916. if project_override_checkout in refs:
  917. selected_ref = project_override_checkout
  918. selected_desc = 'project override ref'
  919. elif project_override_branch in branches:
  920. selected_ref = project_override_branch
  921. selected_desc = 'project override branch'
  922. elif job_override_checkout in refs:
  923. selected_ref = job_override_checkout
  924. selected_desc = 'job override ref'
  925. elif job_override_branch in branches:
  926. selected_ref = job_override_branch
  927. selected_desc = 'job override branch'
  928. elif ref and ref.startswith('refs/heads/'):
  929. selected_ref = ref[len('refs/heads/'):]
  930. selected_desc = 'branch ref'
  931. elif ref and ref.startswith('refs/tags/'):
  932. selected_ref = ref[len('refs/tags/'):]
  933. selected_desc = 'tag ref'
  934. elif zuul_branch and zuul_branch in branches:
  935. selected_ref = zuul_branch
  936. selected_desc = 'zuul branch'
  937. elif project_default_branch in branches:
  938. selected_ref = project_default_branch
  939. selected_desc = 'project default branch'
  940. else:
  941. raise ExecutorError("Project %s does not have the "
  942. "default branch %s" %
  943. (project_canonical_name,
  944. project_default_branch))
  945. return (selected_ref, selected_desc)
  946. def getAnsibleTimeout(self, start, timeout):
  947. if timeout is not None:
  948. now = time.time()
  949. elapsed = now - start
  950. timeout = timeout - elapsed
  951. return timeout
  952. def runPlaybooks(self, args):
  953. result = None
  954. # Run the Ansible 'setup' module on all hosts in the inventory
  955. # at the start of the job with a 60 second timeout. If we
  956. # aren't able to connect to all the hosts and gather facts
  957. # within that timeout, there is likely a network problem
  958. # between here and the hosts in the inventory; return them and
  959. # reschedule the job.
  960. setup_status, setup_code = self.runAnsibleSetup(
  961. self.jobdir.setup_playbook)
  962. if setup_status != self.RESULT_NORMAL or setup_code != 0:
  963. return result
  964. pre_failed = False
  965. success = False
  966. self.started = True
  967. time_started = time.time()
  968. # timeout value is "total" job timeout which accounts for
  969. # pre-run and run playbooks. post-run is different because
  970. # it is used to copy out job logs and we want to do our best
  971. # to copy logs even when the job has timed out.
  972. job_timeout = args['timeout']
  973. for index, playbook in enumerate(self.jobdir.pre_playbooks):
  974. # TODOv3(pabelanger): Implement pre-run timeout setting.
  975. ansible_timeout = self.getAnsibleTimeout(time_started, job_timeout)
  976. pre_status, pre_code = self.runAnsiblePlaybook(
  977. playbook, ansible_timeout, phase='pre', index=index)
  978. if pre_status != self.RESULT_NORMAL or pre_code != 0:
  979. # These should really never fail, so return None and have
  980. # zuul try again
  981. pre_failed = True
  982. break
  983. self.log.debug(
  984. "Overall ansible cpu times: user=%.2f, system=%.2f, "
  985. "children_user=%.2f, children_system=%.2f" %
  986. (self.cpu_times['user'], self.cpu_times['system'],
  987. self.cpu_times['children_user'],
  988. self.cpu_times['children_system']))
  989. if not pre_failed:
  990. ansible_timeout = self.getAnsibleTimeout(time_started, job_timeout)
  991. job_status, job_code = self.runAnsiblePlaybook(
  992. self.jobdir.playbook, ansible_timeout, phase='run')
  993. if job_status == self.RESULT_ABORTED:
  994. return 'ABORTED'
  995. elif job_status == self.RESULT_TIMED_OUT:
  996. # Set the pre-failure flag so this doesn't get
  997. # overridden by a post-failure.
  998. pre_failed = True
  999. result = 'TIMED_OUT'
  1000. elif job_status == self.RESULT_NORMAL:
  1001. success = (job_code == 0)
  1002. if success:
  1003. result = 'SUCCESS'
  1004. else:
  1005. result = 'FAILURE'
  1006. else:
  1007. # The result of the job is indeterminate. Zuul will
  1008. # run it again.
  1009. return None
  1010. # check if we need to pause here
  1011. result_data = self.getResultData()
  1012. pause = result_data.get('zuul', {}).get('pause')
  1013. if pause:
  1014. self.pause()
  1015. post_timeout = args['post_timeout']
  1016. unreachable = False
  1017. for index, playbook in enumerate(self.jobdir.post_playbooks):
  1018. # Post timeout operates a little differently to the main job
  1019. # timeout. We give each post playbook the full post timeout to
  1020. # do its job because post is where you'll often record job logs
  1021. # which are vital to understanding why timeouts have happened in
  1022. # the first place.
  1023. post_status, post_code = self.runAnsiblePlaybook(
  1024. playbook, post_timeout, success, phase='post', index=index)
  1025. if post_status == self.RESULT_ABORTED:
  1026. return 'ABORTED'
  1027. if post_status == self.RESULT_UNREACHABLE:
  1028. # In case we encounter unreachable nodes we need to return None
  1029. # so the job can be retried. However in the case of post
  1030. # playbooks we should still try to run all playbooks to get a
  1031. # chance to upload logs.
  1032. unreachable = True
  1033. if post_status != self.RESULT_NORMAL or post_code != 0:
  1034. success = False
  1035. # If we encountered a pre-failure, that takes
  1036. # precedence over the post result.
  1037. if not pre_failed:
  1038. result = 'POST_FAILURE'
  1039. if (index + 1) == len(self.jobdir.post_playbooks):
  1040. self._logFinalPlaybookError()
  1041. if unreachable:
  1042. return None
  1043. return result
  1044. def _logFinalPlaybookError(self):
  1045. # Failures in the final post playbook can include failures
  1046. # uploading logs, which makes diagnosing issues difficult.
  1047. # Grab the output from the last playbook from the json
  1048. # file and log it.
  1049. json_output = self.jobdir.job_output_file.replace('txt', 'json')
  1050. self.log.debug("Final playbook failed")
  1051. if not os.path.exists(json_output):
  1052. self.log.debug("JSON logfile {logfile} is missing".format(
  1053. logfile=json_output))
  1054. return
  1055. try:
  1056. output = json.load(open(json_output, 'r'))
  1057. last_playbook = output[-1]
  1058. # Transform json to yaml - because it's easier to read and given
  1059. # the size of the data it'll be extra-hard to read this as an
  1060. # all on one line stringified nested dict.
  1061. yaml_out = yaml.safe_dump(last_playbook, default_flow_style=False)
  1062. for line in yaml_out.split('\n'):
  1063. self.log.debug(line)
  1064. except Exception:
  1065. self.log.exception(
  1066. "Could not decode json from {logfile}".format(
  1067. logfile=json_output))
  1068. def getHostList(self, args):
  1069. hosts = []
  1070. for node in args['nodes']:
  1071. # NOTE(mordred): This assumes that the nodepool launcher
  1072. # and the zuul executor both have similar network
  1073. # characteristics, as the launcher will do a test for ipv6
  1074. # viability and if so, and if the node has an ipv6
  1075. # address, it will be the interface_ip. force-ipv4 can be
  1076. # set to True in the clouds.yaml for a cloud if this
  1077. # results in the wrong thing being in interface_ip
  1078. # TODO(jeblair): Move this notice to the docs.
  1079. for name in node['name']:
  1080. ip = node.get('interface_ip')
  1081. port = node.get('connection_port', node.get('ssh_port', 22))
  1082. host_vars = args['host_vars'].get(name, {}).copy()
  1083. check_varnames(host_vars)
  1084. host_vars.update(dict(
  1085. ansible_host=ip,
  1086. ansible_user=self.executor_server.default_username,
  1087. ansible_port=port,
  1088. nodepool=dict(
  1089. label=node.get('label'),
  1090. az=node.get('az'),
  1091. cloud=node.get('cloud'),
  1092. provider=node.get('provider'),
  1093. region=node.get('region'),
  1094. host_id=node.get('host_id'),
  1095. interface_ip=node.get('interface_ip'),
  1096. public_ipv4=node.get('public_ipv4'),
  1097. private_ipv4=node.get('private_ipv4'),
  1098. public_ipv6=node.get('public_ipv6'))))
  1099. username = node.get('username')
  1100. if username:
  1101. host_vars['ansible_user'] = username
  1102. connection_type = node.get('connection_type')
  1103. if connection_type:
  1104. host_vars['ansible_connection'] = connection_type
  1105. if connection_type == "winrm":
  1106. host_vars['ansible_winrm_transport'] = 'certificate'
  1107. host_vars['ansible_winrm_cert_pem'] = \
  1108. self.winrm_pem_file
  1109. host_vars['ansible_winrm_cert_key_pem'] = \
  1110. self.winrm_key_file
  1111. # NOTE(tobiash): This is necessary when using default
  1112. # winrm self-signed certificates. This is probably what
  1113. # most installations want so hard code this here for
  1114. # now.
  1115. host_vars['ansible_winrm_server_cert_validation'] = \
  1116. 'ignore'
  1117. if self.winrm_operation_timeout is not None:
  1118. host_vars['ansible_winrm_operation_timeout_sec'] =\
  1119. self.winrm_operation_timeout
  1120. if self.winrm_read_timeout is not None:
  1121. host_vars['ansible_winrm_read_timeout_sec'] = \
  1122. self.winrm_read_timeout
  1123. host_keys = []
  1124. for key in node.get('host_keys'):
  1125. if port != 22:
  1126. host_keys.append("[%s]:%s %s" % (ip, port, key))
  1127. else:
  1128. host_keys.append("%s %s" % (ip, key))
  1129. hosts.append(dict(
  1130. name=name,
  1131. host_vars=host_vars,
  1132. host_keys=host_keys))
  1133. return hosts
  1134. def _blockPluginDirs(self, path):
  1135. '''Prevent execution of playbooks or roles with plugins
  1136. Plugins are loaded from roles and also if there is a plugin
  1137. dir adjacent to the playbook. Throw an error if the path
  1138. contains a location that would cause a plugin to get loaded.
  1139. '''
  1140. for entry in os.listdir(path):
  1141. entry = os.path.join(path, entry)
  1142. if os.path.isdir(entry) and entry.endswith('_plugins'):
  1143. raise ExecutorError(
  1144. "Ansible plugin dir %s found adjacent to playbook %s in "
  1145. "non-trusted repo." % (entry, path))
  1146. def findPlaybook(self, path, trusted=False):
  1147. if os.path.exists(path):
  1148. if not trusted:
  1149. # Plugins can be defined in multiple locations within the
  1150. # playbook's subtree.
  1151. #
  1152. # 1. directly within the playbook:
  1153. # block playbook_dir/*_plugins
  1154. #
  1155. # 2. within a role defined in playbook_dir/<rolename>:
  1156. # block playbook_dir/*/*_plugins
  1157. #
  1158. # 3. within a role defined in playbook_dir/roles/<rolename>:
  1159. # block playbook_dir/roles/*/*_plugins
  1160. playbook_dir = os.path.dirname(os.path.abspath(path))
  1161. paths_to_check = []
  1162. def addPathsToCheck(root_dir):
  1163. if os.path.isdir(root_dir):
  1164. for entry in os.listdir(root_dir):
  1165. entry = os.path.join(root_dir, entry)
  1166. if os.path.isdir(entry):
  1167. paths_to_check.append(entry)
  1168. # handle case 1
  1169. paths_to_check.append(playbook_dir)
  1170. # handle case 2
  1171. addPathsToCheck(playbook_dir)
  1172. # handle case 3
  1173. addPathsToCheck(os.path.join(playbook_dir, 'roles'))
  1174. for path_to_check in paths_to_check:
  1175. self._blockPluginDirs(path_to_check)
  1176. return path
  1177. raise ExecutorError("Unable to find playbook %s" % path)
  1178. def preparePlaybooks(self, args):
  1179. self.writeAnsibleConfig(self.jobdir.setup_playbook)
  1180. for playbook in args['pre_playbooks']:
  1181. jobdir_playbook = self.jobdir.addPrePlaybook()
  1182. self.preparePlaybook(jobdir_playbook, playbook, args)
  1183. for playbook in args['playbooks']:
  1184. jobdir_playbook = self.jobdir.addPlaybook()
  1185. self.preparePlaybook(jobdir_playbook, playbook, args)
  1186. if jobdir_playbook.path is not None:
  1187. self.jobdir.playbook = jobdir_playbook
  1188. break
  1189. if self.jobdir.playbook is None:
  1190. raise ExecutorError("No playbook specified")
  1191. for playbook in args['post_playbooks']:
  1192. jobdir_playbook = self.jobdir.addPostPlaybook()
  1193. self.preparePlaybook(jobdir_playbook, playbook, args)
  1194. def preparePlaybook(self, jobdir_playbook, playbook, args):
  1195. # Check out the playbook repo if needed and set the path to
  1196. # the playbook that should be run.
  1197. self.log.debug("Prepare playbook repo for %s: %s@%s" %
  1198. (playbook['trusted'] and 'trusted' or 'untrusted',
  1199. playbook['project'], playbook['branch']))
  1200. source = self.executor_server.connections.getSource(
  1201. playbook['connection'])
  1202. project = source.getProject(playbook['project'])
  1203. branch = playbook['branch']
  1204. jobdir_playbook.trusted = playbook['trusted']
  1205. jobdir_playbook.branch = branch
  1206. jobdir_playbook.project_canonical_name = project.canonical_name
  1207. jobdir_playbook.canonical_name_and_path = os.path.join(
  1208. project.canonical_name, playbook['path'])
  1209. path = None
  1210. if not jobdir_playbook.trusted:
  1211. path = self.checkoutUntrustedProject(project, branch, args)
  1212. else:
  1213. path = self.checkoutTrustedProject(project, branch)
  1214. path = os.path.join(path, playbook['path'])
  1215. jobdir_playbook.path = self.findPlaybook(
  1216. path,
  1217. trusted=jobdir_playbook.trusted)
  1218. # If this playbook doesn't exist, don't bother preparing
  1219. # roles.
  1220. if not jobdir_playbook.path:
  1221. return
  1222. for role in playbook['roles']:
  1223. self.prepareRole(jobdir_playbook, role, args)
  1224. secrets = playbook['secrets']
  1225. if secrets:
  1226. check_varnames(secrets)
  1227. jobdir_playbook.secrets_content = yaml.safe_dump(
  1228. secrets, default_flow_style=False)
  1229. self.writeAnsibleConfig(jobdir_playbook)
  1230. def checkoutTrustedProject(self, project, branch):
  1231. root = self.jobdir.getTrustedProject(project.canonical_name,
  1232. branch)
  1233. if not root:
  1234. root = self.jobdir.addTrustedProject(project.canonical_name,
  1235. branch)
  1236. self.log.debug("Cloning %s@%s into new trusted space %s",
  1237. project, branch, root)
  1238. merger = self.executor_server._getMerger(
  1239. root,
  1240. self.executor_server.merge_root,
  1241. self.log)
  1242. merger.checkoutBranch(project.connection_name, project.name,
  1243. branch)
  1244. else:
  1245. self.log.debug("Using existing repo %s@%s in trusted space %s",
  1246. project, branch, root)
  1247. path = os.path.join(root,
  1248. project.canonical_hostname,
  1249. project.name)
  1250. return path
  1251. def checkoutUntrustedProject(self, project, branch, args):
  1252. root = self.jobdir.getUntrustedProject(project.canonical_name,
  1253. branch)
  1254. if not root:
  1255. root = self.jobdir.addUntrustedProject(project.canonical_name,
  1256. branch)
  1257. # If the project is in the dependency chain, clone from
  1258. # there so we pick up any speculative changes, otherwise,
  1259. # clone from the cache.
  1260. merger = None
  1261. for p in args['projects']:
  1262. if (p['connection'] == project.connection_name and
  1263. p['name'] == project.name):
  1264. # We already have this repo prepared
  1265. self.log.debug("Found workdir repo for untrusted project")
  1266. merger = self.executor_server._getMerger(
  1267. root,
  1268. self.jobdir.src_root,
  1269. self.log)
  1270. break
  1271. if merger is None:
  1272. merger = self.executor_server._getMerger(
  1273. root,
  1274. self.executor_server.merge_root,
  1275. self.log)
  1276. self.log.debug("Cloning %s@%s into new untrusted space %s",
  1277. project, branch, root)
  1278. merger.checkoutBranch(project.connection_name, project.name,
  1279. branch)
  1280. else:
  1281. self.log.debug("Using existing repo %s@%s in trusted space %s",
  1282. project, branch, root)
  1283. path = os.path.join(root,
  1284. project.canonical_hostname,
  1285. project.name)
  1286. return path
  1287. def prepareRole(self, jobdir_playbook, role, args):
  1288. if role['type'] == 'zuul':
  1289. root = jobdir_playbook.addRole()
  1290. self.prepareZuulRole(jobdir_playbook, role, args, root)
  1291. def findRole(self, path, trusted=False):
  1292. d = os.path.join(path, 'tasks')
  1293. if os.path.isdir(d):
  1294. # This is a bare role
  1295. if not trusted:
  1296. self._blockPluginDirs(path)
  1297. # None signifies that the repo is a bare role
  1298. return None
  1299. d = os.path.join(path, 'roles')
  1300. if os.path.isdir(d):
  1301. # This repo has a collection of roles
  1302. if not trusted:
  1303. self._blockPluginDirs(d)
  1304. for entry in os.listdir(d):
  1305. entry_path = os.path.join(d, entry)
  1306. if os.path.isdir(entry_path):
  1307. self._blockPluginDirs(entry_path)
  1308. return d
  1309. # It is neither a bare role, nor a collection of roles
  1310. raise RoleNotFoundError("Unable to find role in %s" % (path,))
  1311. def prepareZuulRole(self, jobdir_playbook, role, args, root):
  1312. self.log.debug("Prepare zuul role for %s" % (role,))
  1313. # Check out the role repo if needed
  1314. source = self.executor_server.connections.getSource(
  1315. role['connection'])
  1316. project = source.getProject(role['project'])
  1317. name = role['target_name']
  1318. path = None
  1319. # Find the branch to use for this role. We should generally
  1320. # follow the normal fallback procedure, unless this role's
  1321. # project is the playbook's project, in which case we should
  1322. # use the playbook branch.
  1323. if jobdir_playbook.project_canonical_name == project.canonical_name:
  1324. branch = jobdir_playbook.branch
  1325. self.log.debug("Role project is playbook project, "
  1326. "using playbook branch %s", branch)
  1327. else:
  1328. # Find if the project is one of the job-specified projects.
  1329. # If it is, we can honor the project checkout-override options.
  1330. args_project = {}
  1331. for p in args['projects']:
  1332. if (p['canonical_name'] == project.canonical_name):
  1333. args_project = p
  1334. break
  1335. branch, selected_desc = self.resolveBranch(
  1336. project.canonical_name,
  1337. None,
  1338. args['branch'],
  1339. args['override_branch'],
  1340. args['override_checkout'],
  1341. args_project.get('override_branch'),
  1342. args_project.get('override_checkout'),
  1343. role['project_default_branch'])
  1344. self.log.debug("Role using %s %s", selected_desc, branch)
  1345. if not jobdir_playbook.trusted:
  1346. path = self.checkoutUntrustedProject(project, branch, args)
  1347. else:
  1348. path = self.checkoutTrustedProject(project, branch)
  1349. # The name of the symlink is the requested name of the role
  1350. # (which may be the repo name or may be something else; this
  1351. # can come into play if this is a bare role).
  1352. link = os.path.join(root, name)
  1353. link = os.path.realpath(link)
  1354. if not link.startswith(os.path.realpath(root)):
  1355. raise ExecutorError("Invalid role name %s", name)
  1356. os.symlink(path, link)
  1357. try:
  1358. role_path = self.findRole(link, trusted=jobdir_playbook.trusted)
  1359. except RoleNotFoundError:
  1360. if role['implicit']:
  1361. self.log.debug("Implicit role not found in %s", link)
  1362. return
  1363. raise
  1364. if role_path is None:
  1365. # In the case of a bare role, add the containing directory
  1366. role_path = root
  1367. self.log.debug("Adding role path %s", role_path)
  1368. jobdir_playbook.roles_path.append(role_path)
  1369. def prepareKubeConfig(self, data):
  1370. kube_cfg_path = os.path.join(self.jobdir.work_root, ".kube", "config")
  1371. if os.path.exists(kube_cfg_path):
  1372. kube_cfg = yaml.safe_load(open(kube_cfg_path))
  1373. else:
  1374. os.makedirs(os.path.dirname(kube_cfg_path), exist_ok=True)
  1375. kube_cfg = {
  1376. 'apiVersion': 'v1',
  1377. 'kind': 'Config',
  1378. 'preferences': {},
  1379. 'users': [],
  1380. 'clusters': [],
  1381. 'contexts': [],
  1382. 'current-context': None,
  1383. }
  1384. # Add cluster
  1385. cluster_name = urlsplit(data['host']).netloc.replace('.', '-')
  1386. cluster = {
  1387. 'server': data['host'],
  1388. }
  1389. if data.get('ca_crt'):
  1390. cluster['certificate-authority-data'] = data['ca_crt']
  1391. if data['skiptls']:
  1392. cluster['insecure-skip-tls-verify'] = True
  1393. kube_cfg['clusters'].append({
  1394. 'name': cluster_name,
  1395. 'cluster': cluster,
  1396. })
  1397. # Add user
  1398. user_name = "%s:%s" % (data['namespace'], data['user'])
  1399. kube_cfg['users'].append({
  1400. 'name': user_name,
  1401. 'user': {
  1402. 'token': data['token'],
  1403. },
  1404. })
  1405. # Add context
  1406. data['context_name'] = "%s/%s" % (user_name, cluster_name)
  1407. kube_cfg['contexts'].append({
  1408. 'name': data['context_name'],
  1409. 'context': {
  1410. 'user': user_name,
  1411. 'cluster': cluster_name,
  1412. 'namespace': data['namespace']
  1413. }
  1414. })
  1415. if not kube_cfg['current-context']:
  1416. kube_cfg['current-context'] = data['context_name']
  1417. with open(kube_cfg_path, "w") as of:
  1418. of.write(yaml.safe_dump(kube_cfg, default_flow_style=False))
  1419. def prepareAnsibleFiles(self, args):
  1420. all_vars = args['vars'].copy()
  1421. check_varnames(all_vars)
  1422. # TODO(mordred) Hack to work around running things with python3
  1423. all_vars['ansible_python_interpreter'] = '/usr/bin/python2'
  1424. all_vars['zuul'] = args['zuul'].copy()
  1425. all_vars['zuul']['executor'] = dict(
  1426. hostname=self.executor_server.hostname,
  1427. src_root=self.jobdir.src_root,
  1428. log_root=self.jobdir.log_root,
  1429. work_root=self.jobdir.work_root,
  1430. result_data_file=self.jobdir.result_data_file,
  1431. inventory_file=self.jobdir.inventory)
  1432. resources_nodes = []
  1433. all_vars['zuul']['resources'] = {}
  1434. for node in args['nodes']:
  1435. if node.get('connection_type') in (
  1436. 'namespace', 'project', 'kubectl'):
  1437. # TODO: decrypt resource data using scheduler key
  1438. data = node['connection_port']
  1439. # Setup kube/config file
  1440. self.prepareKubeConfig(data)
  1441. # Convert connection_port in kubectl connection parameters
  1442. node['connection_port'] = None
  1443. node['kubectl_namespace'] = data['namespace']
  1444. node['kubectl_context'] = data['context_name']
  1445. # Add node information to zuul_resources
  1446. all_vars['zuul']['resources'][node['name'][0]] = {
  1447. 'namespace': data['namespace'],
  1448. 'context': data['context_name'],
  1449. }
  1450. if node['connection_type'] in ('project', 'namespace'):
  1451. # Project are special nodes that are not the inventory
  1452. resources_nodes.append(node)
  1453. else:
  1454. # Add the real pod name to the resources_var
  1455. all_vars['zuul']['resources'][
  1456. node['name'][0]]['pod'] = data['pod']
  1457. # Remove resource node from nodes list
  1458. for node in resources_nodes:
  1459. args['nodes'].remove(node)
  1460. nodes = self.getHostList(args)
  1461. setup_inventory = make_setup_inventory_dict(nodes)
  1462. inventory = make_inventory_dict(nodes, args, all_vars)
  1463. with open(self.jobdir.setup_inventory, 'w') as setup_inventory_yaml:
  1464. setup_inventory_yaml.write(
  1465. yaml.safe_dump(setup_inventory, default_flow_style=False))
  1466. with open(self.jobdir.inventory, 'w') as inventory_yaml:
  1467. inventory_yaml.write(
  1468. yaml.safe_dump(inventory, default_flow_style=False))
  1469. with open(self.jobdir.known_hosts, 'w') as known_hosts:
  1470. for node in nodes:
  1471. for key in node['host_keys']:
  1472. known_hosts.write('%s\n' % key)
  1473. with open(self.jobdir.extra_vars, 'w') as extra_vars:
  1474. extra_vars.write(
  1475. yaml.safe_dump(args['extra_vars'], default_flow_style=False))
  1476. def writeLoggingConfig(self):
  1477. self.log.debug("Writing logging config for job %s %s",
  1478. self.jobdir.job_output_file,
  1479. self.jobdir.logging_json)
  1480. logging_config = zuul.ansible.logconfig.JobLoggingConfig(
  1481. job_output_file=self.jobdir.job_output_file)
  1482. logging_config.writeJson(self.jobdir.logging_json)
  1483. def writeAnsibleConfig(self, jobdir_playbook):
  1484. trusted = jobdir_playbook.trusted
  1485. # TODO(mordred) This should likely be extracted into a more generalized
  1486. # mechanism for deployers being able to add callback
  1487. # plugins.
  1488. if ara_callbacks:
  1489. callback_path = '%s:%s' % (
  1490. self.executor_server.callback_dir,
  1491. os.path.dirname(ara_callbacks.__file__))
  1492. else:
  1493. callback_path = self.executor_server.callback_dir
  1494. with open(jobdir_playbook.ansible_config, 'w') as config:
  1495. config.write('[defaults]\n')
  1496. config.write('inventory = %s\n' % self.jobdir.inventory)
  1497. config.write('local_tmp = %s\n' % self.jobdir.local_tmp)
  1498. config.write('retry_files_enabled = False\n')
  1499. config.write('gathering = smart\n')
  1500. config.write('fact_caching = jsonfile\n')
  1501. config.write('fact_caching_connection = %s\n' %
  1502. self.jobdir.fact_cache)
  1503. config.write('library = %s\n'
  1504. % self.executor_server.library_dir)
  1505. config.write('command_warnings = False\n')
  1506. config.write('callback_plugins = %s\n' % callback_path)
  1507. config.write('stdout_callback = zuul_stream\n')
  1508. config.write('filter_plugins = %s\n'
  1509. % self.executor_server.filter_dir)
  1510. # bump the timeout because busy nodes may take more than
  1511. # 10s to respond
  1512. config.write('timeout = 30\n')
  1513. # We need at least the general action dir as this overwrites the
  1514. # command action plugin for log streaming.
  1515. action_dirs = [self.executor_server.action_dir_general]
  1516. if not trusted:
  1517. action_dirs.append(self.executor_server.action_dir)
  1518. config.write('lookup_plugins = %s\n'
  1519. % self.executor_server.lookup_dir)
  1520. config.write('action_plugins = %s\n'
  1521. % ':'.join(action_dirs))
  1522. if jobdir_playbook.roles_path:
  1523. config.write('roles_path = %s\n' % ':'.join(
  1524. jobdir_playbook.roles_path))
  1525. # On playbooks with secrets we want to prevent the
  1526. # printing of args since they may be passed to a task or a
  1527. # role. Otherwise, printing the args could be useful for
  1528. # debugging.
  1529. config.write('display_args_to_stdout = %s\n' %
  1530. str(not jobdir_playbook.secrets_content))
  1531. # Increase the internal poll interval of ansible.
  1532. # The default interval of 0.001s is optimized for interactive
  1533. # ui at the expense of CPU load. As we have a non-interactive
  1534. # automation use case a longer poll interval is more suitable
  1535. # and reduces CPU load of the ansible process.
  1536. config.write('internal_poll_interval = 0.01\n')
  1537. config.write('[ssh_connection]\n')
  1538. # NOTE(pabelanger): Try up to 3 times to run a task on a host, this
  1539. # helps to mitigate UNREACHABLE host errors with SSH.
  1540. config.write('retries = 3\n')
  1541. # NB: when setting pipelining = True, keep_remote_files
  1542. # must be False (the default). Otherwise it apparently
  1543. # will override the pipelining option and effectively
  1544. # disable it. Pipelining has a side effect of running the
  1545. # command without a tty (ie, without the -tt argument to
  1546. # ssh). We require this behavior so that if a job runs a
  1547. # command which expects interactive input on a tty (such
  1548. # as sudo) it does not hang.
  1549. config.write('pipelining = True\n')
  1550. config.write('control_path_dir = %s\n' % self.jobdir.control_path)
  1551. ssh_args = "-o ControlMaster=auto -o ControlPersist=60s " \
  1552. "-o ServerAliveInterval=60 " \
  1553. "-o UserKnownHostsFile=%s" % self.jobdir.known_hosts
  1554. config.write('ssh_args = %s\n' % ssh_args)
  1555. def _ansibleTimeout(self, msg):
  1556. self.log.warning(msg)
  1557. self.abortRunningProc()
  1558. def abortRunningProc(self):
  1559. with self.proc_lock:
  1560. if not self.proc:
  1561. self.log.debug("Abort: no process is running")
  1562. return
  1563. self.log.debug("Abort: sending kill signal to job "
  1564. "process group")
  1565. try:
  1566. pgid = os.getpgid(self.proc.pid)
  1567. os.killpg(pgid, signal.SIGKILL)
  1568. except Exception:
  1569. self.log.exception("Exception while killing ansible process:")
  1570. def runAnsible(self, cmd, timeout, playbook, wrapped=True):
  1571. config_file = playbook.ansible_config
  1572. env_copy = os.environ.copy()
  1573. env_copy.update(self.ssh_agent.env)
  1574. if ara_callbacks:
  1575. env_copy['ARA_LOG_CONFIG'] = self.jobdir.logging_json
  1576. env_copy['ZUUL_JOB_LOG_CONFIG'] = self.jobdir.logging_json
  1577. env_copy['ZUUL_JOBDIR'] = self.jobdir.root
  1578. env_copy['TMP'] = self.jobdir.local_tmp
  1579. pythonpath = env_copy.get('PYTHONPATH')
  1580. if pythonpath:
  1581. pythonpath = [pythonpath]
  1582. else:
  1583. pythonpath = []
  1584. pythonpath = [self.executor_server.ansible_dir] + pythonpath
  1585. env_copy['PYTHONPATH'] = os.path.pathsep.join(pythonpath)
  1586. if playbook.trusted:
  1587. opt_prefix = 'trusted'
  1588. else:
  1589. opt_prefix = 'untrusted'
  1590. ro_paths = get_default(self.executor_server.config, 'executor',
  1591. '%s_ro_paths' % opt_prefix)
  1592. rw_paths = get_default(self.executor_server.config, 'executor',
  1593. '%s_rw_paths' % opt_prefix)
  1594. ro_paths = ro_paths.split(":") if ro_paths else []
  1595. rw_paths = rw_paths.split(":") if rw_paths else []
  1596. ro_paths.append(self.executor_server.ansible_dir)
  1597. ro_paths.append(self.jobdir.ansible_root)
  1598. ro_paths.append(self.jobdir.trusted_root)
  1599. ro_paths.append(self.jobdir.untrusted_root)
  1600. ro_paths.append(playbook.root)
  1601. rw_paths.append(self.jobdir.ansible_cache_root)
  1602. if self.executor_variables_file:
  1603. ro_paths.append(self.executor_variables_file)
  1604. secrets = {}
  1605. if playbook.secrets_content:
  1606. secrets[playbook.secrets] = playbook.secrets_content
  1607. if wrapped:
  1608. wrapper = self.executor_server.execution_wrapper
  1609. else:
  1610. wrapper = self.executor_server.connections.drivers['nullwrap']
  1611. context = wrapper.getExecutionContext(ro_paths, rw_paths, secrets)
  1612. popen = context.getPopen(
  1613. work_dir=self.jobdir.work_root,
  1614. ssh_auth_sock=env_copy.get('SSH_AUTH_SOCK'))
  1615. env_copy['ANSIBLE_CONFIG'] = config_file
  1616. # NOTE(pabelanger): Default HOME variable to jobdir.work_root, as it is
  1617. # possible we don't bind mount current zuul user home directory.
  1618. env_copy['HOME'] = self.jobdir.work_root
  1619. with self.proc_lock:
  1620. if self.aborted:
  1621. return (self.RESULT_ABORTED, None)
  1622. self.log.debug("Ansible command: ANSIBLE_CONFIG=%s %s",
  1623. config_file, " ".join(shlex.quote(c) for c in cmd))
  1624. self.proc = popen(
  1625. cmd,
  1626. cwd=self.jobdir.work_root,
  1627. stdin=subprocess.DEVNULL,
  1628. stdout=subprocess.PIPE,
  1629. stderr=subprocess.STDOUT,
  1630. preexec_fn=os.setsid,
  1631. env=env_copy,
  1632. )
  1633. syntax_buffer = []
  1634. ret = None
  1635. if timeout:
  1636. watchdog = Watchdog(timeout, self._ansibleTimeout,
  1637. ("Ansible timeout exceeded",))
  1638. watchdog.start()
  1639. try:
  1640. # Use manual idx instead of enumerate so that RESULT lines
  1641. # don't count towards BUFFER_LINES_FOR_SYNTAX
  1642. idx = 0
  1643. for line in iter(self.proc.stdout.readline, b''):
  1644. if line.startswith(b'RESULT'):
  1645. # TODO(mordred) Process result commands if sent
  1646. continue
  1647. else:
  1648. idx += 1
  1649. if idx < BUFFER_LINES_FOR_SYNTAX:
  1650. syntax_buffer.append(line)
  1651. line = line[:1024].rstrip()
  1652. self.log.debug("Ansible output: %s" % (line,))
  1653. self.log.debug("Ansible output terminated")
  1654. cpu_times = self.proc.cpu_times()
  1655. self.log.debug("Ansible cpu times: user=%.2f, system=%.2f, "
  1656. "children_user=%.2f, "
  1657. "children_system=%.2f" %
  1658. (cpu_times.user, cpu_times.system,
  1659. cpu_times.children_user,
  1660. cpu_times.children_system))
  1661. self.cpu_times['user'] += cpu_times.user
  1662. self.cpu_times['system'] += cpu_times.system
  1663. self.cpu_times['children_user'] += cpu_times.children_user
  1664. self.cpu_times['children_system'] += cpu_times.children_system
  1665. ret = self.proc.wait()
  1666. self.log.debug("Ansible exit code: %s" % (ret,))
  1667. finally:
  1668. if timeout:
  1669. watchdog.stop()
  1670. self.log.debug("Stopped watchdog")
  1671. self.log.debug("Stopped disk job killer")
  1672. with self.proc_lock:
  1673. self.proc = None
  1674. if timeout and watchdog.timed_out:
  1675. return (self.RESULT_TIMED_OUT, None)
  1676. # Note: Unlike documented ansible currently wrongly returns 4 on
  1677. # unreachable so we have the zuul_unreachable callback module that
  1678. # creates the file job-output.unreachable in case there were
  1679. # unreachable nodes. This can be removed once ansible returns a
  1680. # distinct value for unreachable.
  1681. if ret == 3 or os.path.exists(self.jobdir.job_unreachable_file):
  1682. # AnsibleHostUnreachable: We had a network issue connecting to
  1683. # our zuul-worker.
  1684. return (self.RESULT_UNREACHABLE, None)
  1685. elif ret == -9:
  1686. # Received abort request.
  1687. return (self.RESULT_ABORTED, None)
  1688. elif ret == 1:
  1689. with open(self.jobdir.job_output_file, 'a') as job_output:
  1690. found_marker = False
  1691. for line in syntax_buffer:
  1692. if line.startswith(b'ERROR!'):
  1693. found_marker = True
  1694. if not found_marker:
  1695. continue
  1696. job_output.write("{now} | {line}\n".format(
  1697. now=datetime.datetime.now(),
  1698. line=line.decode('utf-8').rstrip()))
  1699. elif ret == 4:
  1700. # Ansible could not parse the yaml.
  1701. self.log.debug("Ansible parse error")
  1702. # TODO(mordred) If/when we rework use of logger in ansible-playbook
  1703. # we'll want to change how this works to use that as well. For now,
  1704. # this is what we need to do.
  1705. # TODO(mordred) We probably want to put this into the json output
  1706. # as well.
  1707. with open(self.jobdir.job_output_file, 'a') as job_output:
  1708. job_output.write("{now} | ANSIBLE PARSE ERROR\n".format(
  1709. now=datetime.datetime.now()))
  1710. for line in syntax_buffer:
  1711. job_output.write("{now} | {line}\n".format(
  1712. now=datetime.datetime.now(),
  1713. line=line.decode('utf-8').rstrip()))
  1714. elif ret == 250:
  1715. # Unexpected error from ansible
  1716. with open(self.jobdir.job_output_file, 'a') as job_output:
  1717. job_output.write("{now} | UNEXPECTED ANSIBLE ERROR\n".format(
  1718. now=datetime.datetime.now()))
  1719. found_marker = False
  1720. for line in syntax_buffer:
  1721. if line.startswith(b'ERROR! Unexpected Exception'):
  1722. found_marker = True
  1723. if not found_marker:
  1724. continue
  1725. job_output.write("{now} | {line}\n".format(
  1726. now=datetime.datetime.now(),
  1727. line=line.decode('utf-8').rstrip()))
  1728. return (self.RESULT_NORMAL, ret)
  1729. def runAnsibleSetup(self, playbook):
  1730. if self.executor_server.verbose:
  1731. verbose = '-vvv'
  1732. else:
  1733. verbose = '-v'
  1734. cmd = ['ansible', '*', verbose, '-m', 'setup',
  1735. '-i', self.jobdir.setup_inventory,
  1736. '-a', 'gather_subset=!all']
  1737. if self.executor_variables_file is not None:
  1738. cmd.extend(['-e@%s' % self.executor_variables_file])
  1739. result, code = self.runAnsible(
  1740. cmd=cmd, timeout=60, playbook=playbook,
  1741. wrapped=False)
  1742. self.log.debug("Ansible complete, result %s code %s" % (
  1743. self.RESULT_MAP[result], code))
  1744. if self.executor_server.statsd:
  1745. base_key = "zuul.executor.{hostname}.phase.setup"
  1746. self.executor_server.statsd.incr(base_key + ".%s" %
  1747. self.RESULT_MAP[result])
  1748. return result, code
  1749. def runAnsibleCleanup(self, playbook):
  1750. # TODO(jeblair): This requires a bugfix in Ansible 2.4
  1751. # Once this is used, increase the controlpersist timeout.
  1752. return (self.RESULT_NORMAL, 0)
  1753. if self.executor_server.verbose:
  1754. verbose = '-vvv'
  1755. else:
  1756. verbose = '-v'
  1757. cmd = ['ansible', '*', verbose, '-m', 'meta',
  1758. '-a', 'reset_connection']
  1759. result, code = self.runAnsible(
  1760. cmd=cmd, timeout=60, playbook=playbook,
  1761. wrapped=False)
  1762. self.log.debug("Ansible complete, result %s code %s" % (
  1763. self.RESULT_MAP[result], code))
  1764. if self.executor_server.statsd:
  1765. base_key = "zuul.executor.{hostname}.phase.cleanup"
  1766. self.executor_server.statsd.incr(base_key + ".%s" %
  1767. self.RESULT_MAP[result])
  1768. return result, code
  1769. def emitPlaybookBanner(self, playbook, step, phase, result=None):
  1770. # This is used to print a header and a footer, respectively at the
  1771. # beginning and the end of each playbook execution.
  1772. # We are doing it from the executor rather than from a callback because
  1773. # the parameters are not made available to the callback until it's too
  1774. # late.
  1775. phase = phase or ''
  1776. trusted = playbook.trusted
  1777. trusted = 'trusted' if trusted else 'untrusted'
  1778. branch = playbook.branch
  1779. playbook = playbook.canonical_name_and_path
  1780. if phase and phase != 'run':
  1781. phase = '{phase}-run'.format(phase=phase)
  1782. phase = phase.upper()
  1783. if result is not None:
  1784. result = self.RESULT_MAP[result]
  1785. msg = "{phase} {step} {result}: [{trusted} : {playbook}@{branch}]"
  1786. msg = msg.format(phase=phase, step=step, result=result,
  1787. trusted=trusted, playbook=playbook, branch=branch)
  1788. else:
  1789. msg = "{phase} {step}: [{trusted} : {playbook}@{branch}]"
  1790. msg = msg.format(phase=phase, step=step, trusted=trusted,
  1791. playbook=playbook, branch=branch)
  1792. with open(self.jobdir.job_output_file, 'a') as job_output:
  1793. job_output.write("{now} | {msg}\n".format(
  1794. now=datetime.datetime.now(),
  1795. msg=msg))
  1796. def runAnsiblePlaybook(self, playbook, timeout, success=None,
  1797. phase=None, index=None):
  1798. if self.executor_server.verbose:
  1799. verbose = '-vvv'
  1800. else:
  1801. verbose = '-v'
  1802. cmd = ['ansible-playbook', verbose, playbook.path]
  1803. if playbook.secrets_content:
  1804. cmd.extend(['-e', '@' + playbook.secrets])
  1805. cmd.extend(['-e', '@' + self.jobdir.extra_vars])
  1806. if success is not None:
  1807. cmd.extend(['-e', 'zuul_success=%s' % str(bool(success))])
  1808. if phase:
  1809. cmd.extend(['-e', 'zuul_execution_phase=%s' % phase])
  1810. if index is not None:
  1811. cmd.extend(['-e', 'zuul_execution_phase_index=%s' % index])
  1812. cmd.extend(['-e', 'zuul_execution_trusted=%s' % str(playbook.trusted)])
  1813. cmd.extend([
  1814. '-e',
  1815. 'zuul_execution_canonical_name_and_path=%s'
  1816. % playbook.canonical_name_and_path])
  1817. cmd.extend(['-e', 'zuul_execution_branch=%s' % str(playbook.branch)])
  1818. if self.executor_variables_file is not None:
  1819. cmd.extend(['-e@%s' % self.executor_variables_file])
  1820. self.emitPlaybookBanner(playbook, 'START', phase)
  1821. result, code = self.runAnsible(
  1822. cmd=cmd, timeout=timeout, playbook=playbook)
  1823. self.log.debug("Ansible complete, result %s code %s" % (
  1824. self.RESULT_MAP[result], code))
  1825. if self.executor_server.statsd:
  1826. base_key = "zuul.executor.{hostname}.phase.{phase}"
  1827. self.executor_server.statsd.incr(
  1828. base_key + ".{result}",
  1829. result=self.RESULT_MAP[result],
  1830. phase=phase or 'unknown')
  1831. self.emitPlaybookBanner(playbook, 'END', phase, result=result)
  1832. return result, code
  1833. class ExecutorMergeWorker(gear.TextWorker):
  1834. def __init__(self, executor_server, *args, **kw):
  1835. self.zuul_executor_server = executor_server
  1836. super(ExecutorMergeWorker, self).__init__(*args, **kw)
  1837. def handleNoop(self, packet):
  1838. # Wait until the update queue is empty before responding
  1839. while self.zuul_executor_server.update_queue.qsize():
  1840. time.sleep(1)
  1841. with self.zuul_executor_server.merger_lock:
  1842. super(ExecutorMergeWorker, self).handleNoop(packet)
  1843. class ExecutorExecuteWorker(gear.TextWorker):
  1844. def __init__(self, executor_server, *args, **kw):
  1845. self.zuul_executor_server = executor_server
  1846. super(ExecutorExecuteWorker, self).__init__(*args, **kw)
  1847. def handleNoop(self, packet):
  1848. # Delay our response to running a new job based on the number
  1849. # of jobs we're currently running, in an attempt to spread
  1850. # load evenly among executors.
  1851. workers = len(self.zuul_executor_server.job_workers)
  1852. delay = (workers ** 2) / 1000.0
  1853. time.sleep(delay)
  1854. return super(ExecutorExecuteWorker, self).handleNoop(packet)
  1855. class ExecutorServer(object):
  1856. log = logging.getLogger("zuul.ExecutorServer")
  1857. _job_class = AnsibleJob
  1858. def __init__(self, config, connections={}, jobdir_root=None,
  1859. keep_jobdir=False, log_streaming_port=DEFAULT_FINGER_PORT):
  1860. self.config = config
  1861. self.keep_jobdir = keep_jobdir
  1862. self.jobdir_root = jobdir_root
  1863. # TODOv3(mordred): make the executor name more unique --
  1864. # perhaps hostname+pid.
  1865. self.hostname = get_default(self.config, 'executor', 'hostname',
  1866. socket.getfqdn())
  1867. self.log_streaming_port = log_streaming_port
  1868. self.merger_lock = threading.Lock()
  1869. self.governor_lock = threading.Lock()
  1870. self.run_lock = threading.Lock()
  1871. self.verbose = False
  1872. self.command_map = dict(
  1873. stop=self.stop,
  1874. pause=self.pause,
  1875. unpause=self.unpause,
  1876. graceful=self.graceful,
  1877. verbose=self.verboseOn,
  1878. unverbose=self.verboseOff,
  1879. keep=self.keep,
  1880. nokeep=self.nokeep,
  1881. )
  1882. statsd_extra_keys = {'hostname': self.hostname}
  1883. self.statsd = get_statsd(config, statsd_extra_keys)
  1884. self.merge_root = get_default(self.config, 'executor', 'git_dir',
  1885. '/var/lib/zuul/executor-git')
  1886. self.default_username = get_default(self.config, 'executor',
  1887. 'default_username', 'zuul')
  1888. self.disk_limit_per_job = int(get_default(self.config, 'executor',
  1889. 'disk_limit_per_job', 250))
  1890. self.zone = get_default(self.config, 'executor', 'zone')
  1891. self.merge_email = get_default(self.config, 'merger', 'git_user_email')
  1892. self.merge_name = get_default(self.config, 'merger', 'git_user_name')
  1893. self.merge_speed_limit = get_default(
  1894. config, 'merger', 'git_http_low_speed_limit', '1000')
  1895. self.merge_speed_time = get_default(
  1896. config, 'merger', 'git_http_low_speed_time', '30')
  1897. # If the execution driver ever becomes configurable again,
  1898. # this is where it would happen.
  1899. execution_wrapper_name = 'bubblewrap'
  1900. self.accepting_work = False
  1901. self.execution_wrapper = connections.drivers[execution_wrapper_name]
  1902. self.connections = connections
  1903. # This merger and its git repos are used to maintain
  1904. # up-to-date copies of all the repos that are used by jobs, as
  1905. # well as to support the merger:cat functon to supply
  1906. # configuration information to Zuul when it starts.
  1907. self.merger = self._getMerger(self.merge_root, None)
  1908. self.update_queue = DeduplicateQueue()
  1909. command_socket = get_default(
  1910. self.config, 'executor', 'command_socket',
  1911. '/var/lib/zuul/executor.socket')
  1912. self.command_socket = commandsocket.CommandSocket(command_socket)
  1913. state_dir = get_default(self.config, 'executor', 'state_dir',
  1914. '/var/lib/zuul', expand_user=True)
  1915. ansible_dir = os.path.join(state_dir, 'ansible')
  1916. self.ansible_dir = ansible_dir
  1917. if os.path.exists(ansible_dir):
  1918. shutil.rmtree(ansible_dir)
  1919. zuul_dir = os.path.join(ansible_dir, 'zuul')
  1920. plugin_dir = os.path.join(zuul_dir, 'ansible')
  1921. os.makedirs(plugin_dir, mode=0o0755)
  1922. self.library_dir = os.path.join(plugin_dir, 'library')
  1923. self.action_dir = os.path.join(plugin_dir, 'action')
  1924. self.action_dir_general = os.path.join(plugin_dir, 'actiongeneral')
  1925. self.callback_dir = os.path.join(plugin_dir, 'callback')
  1926. self.lookup_dir = os.path.join(plugin_dir, 'lookup')
  1927. self.filter_dir = os.path.join(plugin_dir, 'filter')
  1928. _copy_ansible_files(zuul.ansible, plugin_dir)
  1929. # We're copying zuul.ansible.* into a directory we are going
  1930. # to add to pythonpath, so our plugins can "import
  1931. # zuul.ansible". But we're not installing all of zuul, so
  1932. # create a __init__.py file for the stub "zuul" module.
  1933. with open(os.path.join(zuul_dir, '__init__.py'), 'w'):
  1934. pass
  1935. # If keep is not set, ensure the job dir is empty on startup,
  1936. # in case we were uncleanly shut down.
  1937. if not self.keep_jobdir:
  1938. for fn in os.listdir(self.jobdir_root):
  1939. if not os.path.isdir(fn):
  1940. continue
  1941. self.log.info("Deleting stale jobdir %s", fn)
  1942. shutil.rmtree(os.path.join(self.jobdir_root, fn))
  1943. self.job_workers = {}
  1944. self.disk_accountant = DiskAccountant(self.jobdir_root,
  1945. self.disk_limit_per_job,
  1946. self.stopJobDiskFull,
  1947. self.merge_root)
  1948. self.pause_sensor = PauseSensor()
  1949. cpu_sensor = CPUSensor(config)
  1950. self.sensors = [
  1951. cpu_sensor,
  1952. HDDSensor(config),
  1953. self.pause_sensor,
  1954. RAMSensor(config),
  1955. StartingBuildsSensor(self, cpu_sensor.max_load_avg)
  1956. ]
  1957. def _getMerger(self, root, cache_root, logger=None):
  1958. return zuul.merger.merger.Merger(
  1959. root, self.connections, self.merge_email, self.merge_name,
  1960. self.merge_speed_limit, self.merge_speed_time, cache_root, logger,
  1961. execution_context=True)
  1962. def start(self):
  1963. self._running = True
  1964. self._command_running = True
  1965. server = self.config.get('gearman', 'server')
  1966. port = get_default(self.config, 'gearman', 'port', 4730)
  1967. ssl_key = get_default(self.config, 'gearman', 'ssl_key')
  1968. ssl_cert = get_default(self.config, 'gearman', 'ssl_cert')
  1969. ssl_ca = get_default(self.config, 'gearman', 'ssl_ca')
  1970. self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger')
  1971. self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
  1972. self.executor_worker = ExecutorExecuteWorker(
  1973. self, 'Zuul Executor Server')
  1974. self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca)
  1975. self.log.debug("Waiting for server")
  1976. self.merger_worker.waitForServer()
  1977. self.executor_worker.waitForServer()
  1978. self.log.debug("Registering")
  1979. self.register()
  1980. self.log.debug("Starting command processor")
  1981. self.command_socket.start()
  1982. self.command_thread = threading.Thread(target=self.runCommand,
  1983. name='command')
  1984. self.command_thread.daemon = True
  1985. self.command_thread.start()
  1986. self.log.debug("Starting worker")
  1987. self.update_thread = threading.Thread(target=self._updateLoop,
  1988. name='update')
  1989. self.update_thread.daemon = True
  1990. self.update_thread.start()
  1991. self.merger_thread = threading.Thread(target=self.run_merger,
  1992. name='merger')
  1993. self.merger_thread.daemon = True
  1994. self.merger_thread.start()
  1995. self.executor_thread = threading.Thread(target=self.run_executor,
  1996. name='executor')
  1997. self.executor_thread.daemon = True
  1998. self.executor_thread.start()
  1999. self.governor_stop_event = threading.Event()
  2000. self.governor_thread = threading.Thread(target=self.run_governor,
  2001. name='governor')
  2002. self.governor_thread.daemon = True
  2003. self.governor_thread.start()
  2004. self.disk_accountant.start()
  2005. def register(self):
  2006. self.register_work()
  2007. self.executor_worker.registerFunction("executor:resume:%s" %
  2008. self.hostname)
  2009. self.executor_worker.registerFunction("executor:stop:%s" %
  2010. self.hostname)
  2011. self.merger_worker.registerFunction("merger:merge")
  2012. self.merger_worker.registerFunction("merger:cat")
  2013. self.merger_worker.registerFunction("merger:refstate")
  2014. self.merger_worker.registerFunction("merger:fileschanges")
  2015. def register_work(self):
  2016. if self._running:
  2017. self.accepting_work = True
  2018. function_name = 'executor:execute'
  2019. if self.zone:
  2020. function_name += ':%s' % self.zone
  2021. self.executor_worker.registerFunction(function_name)
  2022. # TODO(jeblair): Update geard to send a noop after
  2023. # registering for a job which is in the queue, then remove
  2024. # this API violation.
  2025. self.executor_worker._sendGrabJobUniq()
  2026. def unregister_work(self):
  2027. self.accepting_work = False
  2028. function_name = 'executor:execute'
  2029. if self.zone:
  2030. function_name += ':%s' % self.zone
  2031. self.executor_worker.unRegisterFunction(function_name)
  2032. def stop(self):
  2033. self.log.debug("Stopping")
  2034. self.disk_accountant.stop()
  2035. # The governor can change function registration, so make sure
  2036. # it has stopped.
  2037. self.governor_stop_event.set()
  2038. self.governor_thread.join()
  2039. # Stop accepting new jobs
  2040. self.merger_worker.setFunctions([])
  2041. self.executor_worker.setFunctions([])
  2042. # Tell the executor worker to abort any jobs it just accepted,
  2043. # and grab the list of currently running job workers.
  2044. with self.run_lock:
  2045. self._running = False
  2046. self._command_running = False
  2047. workers = list(self.job_workers.values())
  2048. self.command_socket.stop()
  2049. for job_worker in workers:
  2050. try:
  2051. job_worker.stop()
  2052. except Exception:
  2053. self.log.exception("Exception sending stop command "
  2054. "to worker:")
  2055. for job_worker in workers:
  2056. try:
  2057. job_worker.wait()
  2058. except Exception:
  2059. self.log.exception("Exception waiting for worker "
  2060. "to stop:")
  2061. # Now that we aren't accepting any new jobs, and all of the
  2062. # running jobs have stopped, tell the update processor to
  2063. # stop.
  2064. self.update_queue.put(None)
  2065. # All job results should have been sent by now, shutdown the
  2066. # gearman workers.
  2067. self.merger_worker.shutdown()
  2068. self.executor_worker.shutdown()
  2069. if self.statsd:
  2070. base_key = 'zuul.executor.{hostname}'
  2071. self.statsd.gauge(base_key + '.load_average', 0)
  2072. self.statsd.gauge(base_key + '.pct_used_ram', 0)
  2073. self.statsd.gauge(base_key + '.running_builds', 0)
  2074. self.log.debug("Stopped")
  2075. def join(self):
  2076. self.governor_thread.join()
  2077. self.update_thread.join()
  2078. self.merger_thread.join()
  2079. self.executor_thread.join()
  2080. def pause(self):
  2081. self.pause_sensor.pause = True
  2082. def unpause(self):
  2083. self.pause_sensor.pause = False
  2084. def graceful(self):
  2085. # TODOv3: implement
  2086. pass
  2087. def verboseOn(self):
  2088. self.verbose = True
  2089. def verboseOff(self):
  2090. self.verbose = False
  2091. def keep(self):
  2092. self.keep_jobdir = True
  2093. def nokeep(self):
  2094. self.keep_jobdir = False
  2095. def runCommand(self):
  2096. while self._command_running:
  2097. try:
  2098. command = self.command_socket.get().decode('utf8')
  2099. if command != '_stop':
  2100. self.command_map[command]()
  2101. except Exception:
  2102. self.log.exception("Exception while processing command")
  2103. def _updateLoop(self):
  2104. while True:
  2105. try:
  2106. self._innerUpdateLoop()
  2107. except StopException:
  2108. return
  2109. except Exception:
  2110. self.log.exception("Exception in update thread:")
  2111. def _innerUpdateLoop(self):
  2112. # Inside of a loop that keeps the main repositories up to date
  2113. task = self.update_queue.get()
  2114. if task is None:
  2115. # We are asked to stop
  2116. raise StopException()
  2117. with self.merger_lock:
  2118. self.log.info("Updating repo %s/%s" % (
  2119. task.connection_name, task.project_name))
  2120. self.merger.updateRepo(task.connection_name, task.project_name)
  2121. repo = self.merger.getRepo(task.connection_name, task.project_name)
  2122. source = self.connections.getSource(task.connection_name)
  2123. project = source.getProject(task.project_name)
  2124. task.canonical_name = project.canonical_name
  2125. task.branches = repo.getBranches()
  2126. task.refs = [r.name for r in repo.getRefs()]
  2127. self.log.debug("Finished updating repo %s/%s" %
  2128. (task.connection_name, task.project_name))
  2129. task.setComplete()
  2130. def update(self, connection_name, project_name):
  2131. # Update a repository in the main merger
  2132. task = UpdateTask(connection_name, project_name)
  2133. task = self.update_queue.put(task)
  2134. return task
  2135. def run_merger(self):
  2136. self.log.debug("Starting merger listener")
  2137. while self._running:
  2138. try:
  2139. job = self.merger_worker.getJob()
  2140. try:
  2141. self.mergerJobDispatch(job)
  2142. except Exception:
  2143. self.log.exception("Exception while running job")
  2144. job.sendWorkException(
  2145. traceback.format_exc().encode('utf8'))
  2146. except gear.InterruptedError:
  2147. pass
  2148. except Exception:
  2149. self.log.exception("Exception while getting job")
  2150. def mergerJobDispatch(self, job):
  2151. if job.name == 'merger:cat':
  2152. self.log.debug("Got cat job: %s" % job.unique)
  2153. self.cat(job)
  2154. elif job.name == 'merger:merge':
  2155. self.log.debug("Got merge job: %s" % job.unique)
  2156. self.merge(job)
  2157. elif job.name == 'merger:refstate':
  2158. self.log.debug("Got refstate job: %s" % job.unique)
  2159. self.refstate(job)
  2160. elif job.name == 'merger:fileschanges':
  2161. self.log.debug("Got fileschanges job: %s" % job.unique)
  2162. self.fileschanges(job)
  2163. else:
  2164. self.log.error("Unable to handle job %s" % job.name)
  2165. job.sendWorkFail()
  2166. def run_executor(self):
  2167. self.log.debug("Starting executor listener")
  2168. while self._running:
  2169. try:
  2170. job = self.executor_worker.getJob()
  2171. try:
  2172. self.executorJobDispatch(job)
  2173. except Exception:
  2174. self.log.exception("Exception while running job")
  2175. job.sendWorkException(
  2176. traceback.format_exc().encode('utf8'))
  2177. except gear.InterruptedError:
  2178. pass
  2179. except Exception:
  2180. self.log.exception("Exception while getting job")
  2181. def executorJobDispatch(self, job):
  2182. with self.run_lock:
  2183. if not self._running:
  2184. job.sendWorkFail()
  2185. return
  2186. function_name = 'executor:execute'
  2187. if self.zone:
  2188. function_name += ':%s' % self.zone
  2189. if job.name == (function_name):
  2190. self.log.debug("Got %s job: %s" %
  2191. (function_name, job.unique))
  2192. self.executeJob(job)
  2193. elif job.name.startswith('executor:resume'):
  2194. self.log.debug("Got resume job: %s" % job.unique)
  2195. self.resumeJob(job)
  2196. elif job.name.startswith('executor:stop'):
  2197. self.log.debug("Got stop job: %s" % job.unique)
  2198. self.stopJob(job)
  2199. else:
  2200. self.log.error("Unable to handle job %s" % job.name)
  2201. job.sendWorkFail()
  2202. def executeJob(self, job):
  2203. if self.statsd:
  2204. base_key = 'zuul.executor.{hostname}'
  2205. self.statsd.incr(base_key + '.builds')
  2206. self.job_workers[job.unique] = self._job_class(self, job)
  2207. # Run manageLoad before starting the thread mostly for the
  2208. # benefit of the unit tests to make the calculation of the
  2209. # number of starting jobs more deterministic.
  2210. self.manageLoad()
  2211. self.job_workers[job.unique].run()
  2212. def run_governor(self):
  2213. while not self.governor_stop_event.wait(10):
  2214. try:
  2215. self.manageLoad()
  2216. except Exception:
  2217. self.log.exception("Exception in governor thread:")
  2218. def manageLoad(self):
  2219. ''' Apply some heuristics to decide whether or not we should
  2220. be asking for more jobs '''
  2221. with self.governor_lock:
  2222. return self._manageLoad()
  2223. def _manageLoad(self):
  2224. if self.accepting_work:
  2225. # Don't unregister if we don't have any active jobs.
  2226. for sensor in self.sensors:
  2227. ok, message = sensor.isOk()
  2228. if not ok:
  2229. self.log.info(
  2230. "Unregistering due to {}".format(message))
  2231. self.unregister_work()
  2232. break
  2233. else:
  2234. reregister = True
  2235. limits = []
  2236. for sensor in self.sensors:
  2237. ok, message = sensor.isOk()
  2238. limits.append(message)
  2239. if not ok:
  2240. reregister = False
  2241. break
  2242. if reregister:
  2243. self.log.info("Re-registering as job is within its limits "
  2244. "{}".format(", ".join(limits)))
  2245. self.register_work()
  2246. if self.statsd:
  2247. base_key = 'zuul.executor.{hostname}'
  2248. for sensor in self.sensors:
  2249. sensor.reportStats(self.statsd, base_key)
  2250. def finishJob(self, unique):
  2251. del(self.job_workers[unique])
  2252. def stopJobDiskFull(self, jobdir):
  2253. unique = os.path.basename(jobdir)
  2254. self.stopJobByUnique(unique, reason=AnsibleJob.RESULT_DISK_FULL)
  2255. def resumeJob(self, job):
  2256. try:
  2257. args = json.loads(job.arguments)
  2258. self.log.debug("Resume job with arguments: %s" % (args,))
  2259. unique = args['uuid']
  2260. self.resumeJobByUnique(unique)
  2261. finally:
  2262. job.sendWorkComplete()
  2263. def stopJob(self, job):
  2264. try:
  2265. args = json.loads(job.arguments)
  2266. self.log.debug("Stop job with arguments: %s" % (args,))
  2267. unique = args['uuid']
  2268. self.stopJobByUnique(unique)
  2269. finally:
  2270. job.sendWorkComplete()
  2271. def resumeJobByUnique(self, unique):
  2272. job_worker = self.job_workers.get(unique)
  2273. if not job_worker:
  2274. self.log.debug("Unable to find worker for job %s" % (unique,))
  2275. return
  2276. try:
  2277. job_worker.resume()
  2278. except Exception:
  2279. self.log.exception("Exception sending resume command "
  2280. "to worker:")
  2281. def stopJobByUnique(self, unique, reason=None):
  2282. job_worker = self.job_workers.get(unique)
  2283. if not job_worker:
  2284. self.log.debug("Unable to find worker for job %s" % (unique,))
  2285. return
  2286. try:
  2287. job_worker.stop(reason)
  2288. except Exception:
  2289. self.log.exception("Exception sending stop command "
  2290. "to worker:")
  2291. def cat(self, job):
  2292. args = json.loads(job.arguments)
  2293. task = self.update(args['connection'], args['project'])
  2294. task.wait()
  2295. with self.merger_lock:
  2296. files = self.merger.getFiles(args['connection'], args['project'],
  2297. args['branch'], args['files'],
  2298. args.get('dirs', []))
  2299. result = dict(updated=True,
  2300. files=files)
  2301. job.sendWorkComplete(json.dumps(result))
  2302. def fileschanges(self, job):
  2303. args = json.loads(job.arguments)
  2304. task = self.update(args['connection'], args['project'])
  2305. task.wait()
  2306. with self.merger_lock:
  2307. files = self.merger.getFilesChanges(
  2308. args['connection'], args['project'],
  2309. args['branch'],
  2310. args['tosha'])
  2311. result = dict(updated=True,
  2312. files=files)
  2313. job.sendWorkComplete(json.dumps(result))
  2314. def refstate(self, job):
  2315. args = json.loads(job.arguments)
  2316. with self.merger_lock:
  2317. success, repo_state = self.merger.getRepoState(args['items'])
  2318. result = dict(updated=success,
  2319. repo_state=repo_state)
  2320. job.sendWorkComplete(json.dumps(result))
  2321. def merge(self, job):
  2322. args = json.loads(job.arguments)
  2323. with self.merger_lock:
  2324. ret = self.merger.mergeChanges(args['items'], args.get('files'),
  2325. args.get('dirs', []),
  2326. args.get('repo_state'))
  2327. result = dict(merged=(ret is not None))
  2328. if ret is None:
  2329. result['commit'] = result['files'] = result['repo_state'] = None
  2330. else:
  2331. (result['commit'], result['files'], result['repo_state'],
  2332. recent, orig_commit) = ret
  2333. job.sendWorkComplete(json.dumps(result))