Manage a pool of nodes for a distributed test infrastructure
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

builder.py 50KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304
  1. #!/usr/bin/env python
  2. # Copyright 2015 Hewlett-Packard Development Company, L.P.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. import logging
  16. import os
  17. import shutil
  18. import socket
  19. import subprocess
  20. import threading
  21. import time
  22. import shlex
  23. import uuid
  24. from nodepool import config as nodepool_config
  25. from nodepool import exceptions
  26. from nodepool import provider_manager
  27. from nodepool import stats
  28. from nodepool import zk
  29. MINS = 60
  30. HOURS = 60 * MINS
  31. # How long to wait for an image save
  32. IMAGE_TIMEOUT = 6 * HOURS
  33. # How long to wait between checks for ZooKeeper connectivity if it disappears.
  34. SUSPEND_WAIT_TIME = 30
  35. # HP Cloud requires qemu compat with 0.10. That version works elsewhere,
  36. # so just hardcode it for all qcow2 building
  37. DEFAULT_QEMU_IMAGE_COMPAT_OPTIONS = "--qemu-img-options 'compat=0.10'"
  38. class DibImageFile(object):
  39. '''
  40. Class used as an API to finding locally built DIB image files, and
  41. also used to represent the found files. Image files are named using
  42. a unique ID, but can be available in multiple formats (with different
  43. extensions).
  44. '''
  45. def __init__(self, image_id, extension=None):
  46. self.image_id = image_id
  47. self.extension = extension
  48. self.md5 = None
  49. self.md5_file = None
  50. self.sha256 = None
  51. self.sha256_file = None
  52. @staticmethod
  53. def from_path(path):
  54. image_file = os.path.basename(path)
  55. image_id, extension = image_file.rsplit('.', 1)
  56. return DibImageFile(image_id, extension)
  57. @staticmethod
  58. def from_image_id(images_dir, image_id):
  59. images = []
  60. for image_filename in os.listdir(images_dir):
  61. if os.path.isfile(os.path.join(images_dir, image_filename)):
  62. image = DibImageFile.from_path(image_filename)
  63. if image.image_id == image_id:
  64. images.append(image)
  65. return images
  66. @staticmethod
  67. def from_images_dir(images_dir):
  68. return [DibImageFile.from_path(x) for x in os.listdir(images_dir)]
  69. def to_path(self, images_dir, with_extension=True):
  70. my_path = os.path.join(images_dir, self.image_id)
  71. if with_extension:
  72. if self.extension is None:
  73. raise exceptions.BuilderError(
  74. 'Cannot specify image extension of None'
  75. )
  76. my_path += '.' + self.extension
  77. md5_path = '%s.%s' % (my_path, 'md5')
  78. md5 = self._checksum(md5_path)
  79. if md5:
  80. self.md5_file = md5_path
  81. self.md5 = md5[0:32]
  82. sha256_path = '%s.%s' % (my_path, 'sha256')
  83. sha256 = self._checksum(sha256_path)
  84. if sha256:
  85. self.sha256_file = sha256_path
  86. self.sha256 = sha256[0:64]
  87. return my_path
  88. def _checksum(self, filename):
  89. if not os.path.isfile(filename):
  90. return None
  91. with open(filename, 'r') as f:
  92. data = f.read()
  93. return data
  94. class BaseWorker(threading.Thread):
  95. def __init__(self, builder_id, config_path, secure_path, interval, zk):
  96. super(BaseWorker, self).__init__()
  97. self.log = logging.getLogger("nodepool.builder.BaseWorker")
  98. self.daemon = True
  99. self._running = False
  100. self._stop_event = threading.Event()
  101. self._config = None
  102. self._config_path = config_path
  103. self._secure_path = secure_path
  104. self._zk = zk
  105. self._hostname = socket.gethostname()
  106. self._statsd = stats.get_client()
  107. self._interval = interval
  108. self._builder_id = builder_id
  109. def _checkForZooKeeperChanges(self, new_config):
  110. '''
  111. Check config for ZooKeeper cluster changes.
  112. If the defined set of ZooKeeper servers changes, the connection
  113. will use the new server set.
  114. '''
  115. if self._config.zookeeper_servers != new_config.zookeeper_servers:
  116. self.log.debug("Detected ZooKeeper server changes")
  117. self._zk.resetHosts(list(new_config.zookeeper_servers.values()))
  118. @property
  119. def running(self):
  120. return self._running
  121. def shutdown(self):
  122. self._running = False
  123. self._stop_event.set()
  124. class CleanupWorker(BaseWorker):
  125. '''
  126. The janitor of nodepool-builder that will remove images from providers
  127. and any local DIB builds.
  128. '''
  129. def __init__(self, name, builder_id, config_path, secure_path,
  130. interval, zk):
  131. super(CleanupWorker, self).__init__(builder_id, config_path,
  132. secure_path, interval, zk)
  133. self.log = logging.getLogger(
  134. "nodepool.builder.CleanupWorker.%s" % name)
  135. self.name = 'CleanupWorker.%s' % name
  136. def _buildUploadRecencyTable(self):
  137. '''
  138. Builds a table for each image of the most recent uploads to each
  139. provider.
  140. Example)
  141. image1:
  142. providerA: [ (build_id, upload_id, upload_time), ... ]
  143. providerB: [ (build_id, upload_id, upload_time), ... ]
  144. image2:
  145. providerC: [ (build_id, upload_id, upload_time), ... ]
  146. '''
  147. self._rtable = {}
  148. for image in self._zk.getImageNames():
  149. self._rtable[image] = {}
  150. for build in self._zk.getBuilds(image, zk.READY):
  151. for provider in self._zk.getBuildProviders(image, build.id):
  152. if provider not in self._rtable[image]:
  153. self._rtable[image][provider] = []
  154. uploads = self._zk.getMostRecentBuildImageUploads(
  155. 2, image, build.id, provider, zk.READY)
  156. for upload in uploads:
  157. self._rtable[image][provider].append(
  158. (build.id, upload.id, upload.state_time)
  159. )
  160. # Sort uploads by state_time (upload time) and keep the 2 most recent
  161. for i in list(self._rtable.keys()):
  162. for p in self._rtable[i].keys():
  163. self._rtable[i][p].sort(key=lambda x: x[2], reverse=True)
  164. self._rtable[i][p] = self._rtable[i][p][:2]
  165. self.log.debug("Upload recency table: %s", self._rtable)
  166. def _isRecentUpload(self, image, provider, build_id, upload_id):
  167. '''
  168. Search for an upload for a build within the recency table.
  169. '''
  170. provider = self._rtable[image].get(provider)
  171. if not provider:
  172. return False
  173. for b_id, u_id, u_time in provider:
  174. if build_id == b_id and upload_id == u_id:
  175. return True
  176. return False
  177. def _inProgressUpload(self, upload):
  178. '''
  179. Determine if an upload is in progress.
  180. '''
  181. if upload.state != zk.UPLOADING:
  182. return False
  183. try:
  184. with self._zk.imageUploadLock(upload.image_name, upload.build_id,
  185. upload.provider_name,
  186. blocking=False):
  187. pass
  188. except exceptions.ZKLockException:
  189. return True
  190. return False
  191. def _removeDibItem(self, filename):
  192. if filename is None:
  193. return
  194. try:
  195. os.remove(filename)
  196. self.log.info("Removed DIB file %s" % filename)
  197. except OSError as e:
  198. if e.errno != 2: # No such file or directory
  199. raise e
  200. def _deleteLocalBuild(self, image, build):
  201. '''
  202. Remove expired image build from local disk.
  203. :param str image: Name of the image whose build we are deleting.
  204. :param ImageBuild build: The build we want to delete.
  205. :returns: True if files were deleted, False if none were found.
  206. '''
  207. base = "-".join([image, build.id])
  208. files = DibImageFile.from_image_id(self._config.imagesdir, base)
  209. if not files:
  210. # NOTE(pabelanger): It is possible we don't have any files because
  211. # diskimage-builder failed. So, check to see if we have the correct
  212. # builder so we can removed the data from zookeeper.
  213. # To maintain backward compatibility with builders that didn't
  214. # use unique builder IDs before, but do now, always compare to
  215. # hostname as well since some ZK data may still reference that.
  216. if (build.builder_id == self._builder_id or
  217. build.builder == self._hostname
  218. ):
  219. return True
  220. return False
  221. self.log.info("Doing cleanup for %s:%s" % (image, build.id))
  222. manifest_dir = None
  223. for f in files:
  224. filename = f.to_path(self._config.imagesdir, True)
  225. if not manifest_dir:
  226. path, ext = filename.rsplit('.', 1)
  227. manifest_dir = path + ".d"
  228. items = [filename, f.md5_file, f.sha256_file]
  229. list(map(self._removeDibItem, items))
  230. try:
  231. shutil.rmtree(manifest_dir)
  232. self.log.info("Removed DIB manifest %s" % manifest_dir)
  233. except OSError as e:
  234. if e.errno != 2: # No such file or directory
  235. raise e
  236. return True
  237. def _cleanupProvider(self, provider, image, build_id):
  238. all_uploads = self._zk.getUploads(image, build_id, provider.name)
  239. for upload in all_uploads:
  240. if self._isRecentUpload(image, provider.name, build_id, upload.id):
  241. continue
  242. self._deleteUpload(upload)
  243. def _cleanupObsoleteProviderUploads(self, provider, image, build_id):
  244. if image in provider.diskimages:
  245. # This image is in use for this provider
  246. return
  247. all_uploads = self._zk.getUploads(image, build_id, provider.name)
  248. for upload in all_uploads:
  249. self._deleteUpload(upload)
  250. def _deleteUpload(self, upload):
  251. deleted = False
  252. if upload.state != zk.DELETING:
  253. if not self._inProgressUpload(upload):
  254. data = zk.ImageUpload()
  255. data.state = zk.DELETING
  256. self._zk.storeImageUpload(upload.image_name, upload.build_id,
  257. upload.provider_name, data,
  258. upload.id)
  259. deleted = True
  260. if upload.state == zk.DELETING or deleted:
  261. manager = self._config.provider_managers[upload.provider_name]
  262. try:
  263. # It is possible we got this far, but don't actually have an
  264. # external_name. This could mean that zookeeper and cloud
  265. # provider are some how out of sync.
  266. if upload.external_name:
  267. base = "-".join([upload.image_name, upload.build_id])
  268. self.log.info("Deleting image build %s from %s" %
  269. (base, upload.provider_name))
  270. manager.deleteImage(upload.external_name)
  271. except Exception:
  272. self.log.exception(
  273. "Unable to delete image %s from %s:",
  274. upload.external_name, upload.provider_name)
  275. else:
  276. self.log.debug("Deleting image upload: %s", upload)
  277. self._zk.deleteUpload(upload.image_name, upload.build_id,
  278. upload.provider_name, upload.id)
  279. def _inProgressBuild(self, build, image):
  280. '''
  281. Determine if a DIB build is in progress.
  282. '''
  283. if build.state != zk.BUILDING:
  284. return False
  285. try:
  286. with self._zk.imageBuildLock(image, blocking=False):
  287. # An additional state check is needed to make sure it hasn't
  288. # changed on us. If it has, then let's pretend a build is
  289. # still in progress so that it is checked again later with
  290. # its new build state.
  291. b = self._zk.getBuild(image, build.id)
  292. if b.state != zk.BUILDING:
  293. return True
  294. pass
  295. except exceptions.ZKLockException:
  296. return True
  297. return False
  298. def _cleanup(self):
  299. '''
  300. Clean up builds on disk and in providers.
  301. '''
  302. known_providers = self._config.providers.values()
  303. image_names = self._zk.getImageNames()
  304. self._buildUploadRecencyTable()
  305. for image in image_names:
  306. try:
  307. self._cleanupImage(known_providers, image)
  308. except Exception:
  309. self.log.exception("Exception cleaning up image %s:", image)
  310. def _filterLocalBuilds(self, image, builds):
  311. '''Return the subset of builds that are local'''
  312. ret = []
  313. for build in builds:
  314. base = "-".join([image, build.id])
  315. files = DibImageFile.from_image_id(self._config.imagesdir, base)
  316. if files:
  317. ret.append(build)
  318. return ret
  319. def _cleanupCurrentProviderUploads(self, provider, image, build_id):
  320. '''
  321. Remove cruft from a current build.
  322. Current builds (the ones we want to keep) are treated special since
  323. we want to remove any ZK nodes for uploads that failed exceptionally
  324. hard (i.e., we could not set the state to FAILED and they remain as
  325. UPLOADING), and we also want to remove any uploads that have been
  326. marked for deleting.
  327. '''
  328. cruft = self._zk.getUploads(image, build_id, provider,
  329. states=[zk.UPLOADING,
  330. zk.DELETING,
  331. zk.FAILED])
  332. for upload in cruft:
  333. if (upload.state == zk.UPLOADING and
  334. not self._inProgressUpload(upload)
  335. ):
  336. # Since we cache the uploads above, we need to verify the
  337. # state hasn't changed on us (e.g., it could have gone from
  338. # an in progress upload to a successfully completed upload
  339. # between the getUploads() and the _inProgressUpload() check.
  340. u = self._zk.getImageUpload(image, build_id, provider,
  341. upload.id)
  342. if upload.state != u.state:
  343. continue
  344. self.log.debug("Removing failed upload record: %s" % upload)
  345. self._zk.deleteUpload(image, build_id, provider, upload.id)
  346. elif upload.state == zk.DELETING:
  347. self.log.debug(
  348. "Removing deleted upload and record: %s" % upload)
  349. self._deleteUpload(upload)
  350. elif upload.state == zk.FAILED:
  351. self.log.debug(
  352. "Removing failed upload and record: %s" % upload)
  353. self._deleteUpload(upload)
  354. def _cleanupImage(self, known_providers, image):
  355. '''
  356. Clean up one image.
  357. '''
  358. # Get the list of all builds, then work from that so that we
  359. # have a consistent view of the data.
  360. all_builds = self._zk.getBuilds(image)
  361. builds_to_keep = set([b for b in sorted(all_builds, reverse=True,
  362. key=lambda y: y.state_time)
  363. if b.state == zk.READY][:2])
  364. local_builds = set(self._filterLocalBuilds(image, all_builds))
  365. diskimage = self._config.diskimages.get(image)
  366. if not diskimage and not local_builds:
  367. # This builder is and was not responsible for this image,
  368. # so ignore it.
  369. return
  370. # Remove any local builds that are not in use.
  371. if not diskimage or (diskimage and not diskimage.image_types):
  372. builds_to_keep -= local_builds
  373. # TODO(jeblair): When all builds for an image which is not
  374. # in use are deleted, the image znode should be deleted as
  375. # well.
  376. for build in all_builds:
  377. # Start by deleting any uploads that are no longer needed
  378. # because this image has been removed from a provider
  379. # (since this should be done regardless of the build
  380. # state).
  381. for provider in known_providers:
  382. if not provider.manage_images:
  383. # This provider doesn't manage images
  384. continue
  385. try:
  386. self._cleanupObsoleteProviderUploads(provider, image,
  387. build.id)
  388. if build in builds_to_keep:
  389. self._cleanupCurrentProviderUploads(provider.name,
  390. image,
  391. build.id)
  392. except Exception:
  393. self.log.exception("Exception cleaning up uploads "
  394. "of build %s of image %s in "
  395. "provider %s:",
  396. build, image, provider)
  397. # If the build is in the delete state, we will try to
  398. # delete the entire thing regardless.
  399. if build.state != zk.DELETING:
  400. # If it is in any other state, we will only delete it
  401. # if it is older than the most recent two ready
  402. # builds, or is in the building state but not actually
  403. # building.
  404. if build in builds_to_keep:
  405. continue
  406. elif self._inProgressBuild(build, image):
  407. continue
  408. for provider in known_providers:
  409. if not provider.manage_images:
  410. # This provider doesn't manage images
  411. continue
  412. try:
  413. self._cleanupProvider(provider, image, build.id)
  414. except Exception:
  415. self.log.exception("Exception cleaning up build %s "
  416. "of image %s in provider %s:",
  417. build, image, provider)
  418. uploads_exist = False
  419. for p in self._zk.getBuildProviders(image, build.id):
  420. if self._zk.getImageUploadNumbers(image, build.id, p):
  421. uploads_exist = True
  422. break
  423. if not uploads_exist:
  424. if build.state != zk.DELETING:
  425. with self._zk.imageBuildNumberLock(
  426. image, build.id, blocking=False
  427. ):
  428. build.state = zk.DELETING
  429. self._zk.storeBuild(image, build, build.id)
  430. # Release the lock here so we can delete the build znode
  431. if self._deleteLocalBuild(image, build):
  432. if not self._zk.deleteBuild(image, build.id):
  433. self.log.error("Unable to delete build %s because"
  434. " uploads still remain.", build)
  435. def run(self):
  436. '''
  437. Start point for the CleanupWorker thread.
  438. '''
  439. self._running = True
  440. while self._running:
  441. # Don't do work if we've lost communication with the ZK cluster
  442. did_suspend = False
  443. while self._zk and (self._zk.suspended or self._zk.lost):
  444. did_suspend = True
  445. self.log.info("ZooKeeper suspended. Waiting")
  446. time.sleep(SUSPEND_WAIT_TIME)
  447. if did_suspend:
  448. self.log.info("ZooKeeper available. Resuming")
  449. try:
  450. self._run()
  451. except Exception:
  452. self.log.exception("Exception in CleanupWorker:")
  453. time.sleep(10)
  454. self._stop_event.wait(self._interval)
  455. provider_manager.ProviderManager.stopProviders(self._config)
  456. def _run(self):
  457. '''
  458. Body of run method for exception handling purposes.
  459. '''
  460. new_config = nodepool_config.loadConfig(self._config_path)
  461. if self._secure_path:
  462. nodepool_config.loadSecureConfig(new_config, self._secure_path)
  463. if not self._config:
  464. self._config = new_config
  465. self._checkForZooKeeperChanges(new_config)
  466. provider_manager.ProviderManager.reconfigure(self._config, new_config,
  467. self._zk,
  468. use_taskmanager=False,
  469. only_image_manager=True)
  470. self._config = new_config
  471. self._cleanup()
  472. class BuildWorker(BaseWorker):
  473. def __init__(self, name, builder_id, config_path, secure_path,
  474. interval, zk, dib_cmd):
  475. super(BuildWorker, self).__init__(builder_id, config_path, secure_path,
  476. interval, zk)
  477. self.log = logging.getLogger("nodepool.builder.BuildWorker.%s" % name)
  478. self.name = 'BuildWorker.%s' % name
  479. self.dib_cmd = dib_cmd
  480. def _getBuildLogRoot(self, name):
  481. log_dir = self._config.build_log_dir
  482. if not log_dir:
  483. log_dir = '/var/log/nodepool/builds'
  484. if not os.path.exists(log_dir):
  485. os.makedirs(log_dir)
  486. return log_dir
  487. def _pruneBuildLogs(self, name):
  488. log_dir = self._getBuildLogRoot(name)
  489. keep = max(self._config.build_log_retention, 1)
  490. existing = sorted(os.listdir(log_dir))
  491. existing = [f for f in existing if f.startswith(name)]
  492. delete = existing[:0 - keep]
  493. for f in delete:
  494. fp = os.path.join(log_dir, f)
  495. self.log.info("Deleting old build log %s" % (fp,))
  496. os.unlink(fp)
  497. def _getBuildLog(self, name, build_id):
  498. log_dir = self._getBuildLogRoot(name)
  499. return os.path.join(log_dir, '%s-%s.log' % (name, build_id))
  500. def _checkForScheduledImageUpdates(self):
  501. '''
  502. Check every DIB image to see if it has aged out and needs rebuilt.
  503. '''
  504. for diskimage in self._config.diskimages.values():
  505. # Check if we've been told to shutdown
  506. # or if ZK connection is suspended
  507. if not self._running or self._zk.suspended or self._zk.lost:
  508. return
  509. try:
  510. self._checkImageForScheduledImageUpdates(diskimage)
  511. except Exception:
  512. self.log.exception("Exception checking for scheduled "
  513. "update of diskimage %s",
  514. diskimage.name)
  515. def _checkImageForScheduledImageUpdates(self, diskimage):
  516. '''
  517. Check one DIB image to see if it needs to be rebuilt.
  518. .. note:: It's important to lock the image build before we check
  519. the state time and then build to eliminate any race condition.
  520. '''
  521. # Check if diskimage builds are paused.
  522. if diskimage.pause:
  523. return
  524. if not diskimage.image_types:
  525. # We don't know what formats to build.
  526. return
  527. now = int(time.time())
  528. builds = self._zk.getMostRecentBuilds(1, diskimage.name, zk.READY)
  529. # If there is no build for this image, or it has aged out
  530. # or if the current build is missing an image type from
  531. # the config file, start a new build.
  532. if (not builds
  533. or (now - builds[0].state_time) >= diskimage.rebuild_age
  534. or not set(builds[0].formats).issuperset(diskimage.image_types)
  535. ):
  536. try:
  537. with self._zk.imageBuildLock(diskimage.name, blocking=False):
  538. # To avoid locking each image repeatedly, we have an
  539. # second, redundant check here to verify that a new
  540. # build didn't appear between the first check and the
  541. # lock acquisition. If it's not the same build as
  542. # identified in the first check above, assume another
  543. # BuildWorker created the build for us and continue.
  544. builds2 = self._zk.getMostRecentBuilds(
  545. 1, diskimage.name, zk.READY)
  546. if builds2 and builds[0].id != builds2[0].id:
  547. return
  548. self.log.info("Building image %s" % diskimage.name)
  549. data = zk.ImageBuild()
  550. data.state = zk.BUILDING
  551. data.builder_id = self._builder_id
  552. data.builder = self._hostname
  553. data.formats = list(diskimage.image_types)
  554. bnum = self._zk.storeBuild(diskimage.name, data)
  555. data = self._buildImage(bnum, diskimage)
  556. self._zk.storeBuild(diskimage.name, data, bnum)
  557. except exceptions.ZKLockException:
  558. # Lock is already held. Skip it.
  559. pass
  560. def _checkForManualBuildRequest(self):
  561. '''
  562. Query ZooKeeper for any manual image build requests.
  563. '''
  564. for diskimage in self._config.diskimages.values():
  565. # Check if we've been told to shutdown
  566. # or if ZK connection is suspended
  567. if not self._running or self._zk.suspended or self._zk.lost:
  568. return
  569. try:
  570. self._checkImageForManualBuildRequest(diskimage)
  571. except Exception:
  572. self.log.exception("Exception checking for manual "
  573. "update of diskimage %s",
  574. diskimage)
  575. def _checkImageForManualBuildRequest(self, diskimage):
  576. '''
  577. Query ZooKeeper for a manual image build request for one image.
  578. '''
  579. # Check if diskimage builds are paused.
  580. if diskimage.pause:
  581. return
  582. # Reduce use of locks by adding an initial check here and
  583. # a redundant check after lock acquisition.
  584. if not self._zk.hasBuildRequest(diskimage.name):
  585. return
  586. try:
  587. with self._zk.imageBuildLock(diskimage.name, blocking=False):
  588. # Redundant check
  589. if not self._zk.hasBuildRequest(diskimage.name):
  590. return
  591. self.log.info(
  592. "Manual build request for image %s" % diskimage.name)
  593. data = zk.ImageBuild()
  594. data.state = zk.BUILDING
  595. data.builder_id = self._builder_id
  596. data.builder = self._hostname
  597. data.formats = list(diskimage.image_types)
  598. bnum = self._zk.storeBuild(diskimage.name, data)
  599. data = self._buildImage(bnum, diskimage)
  600. self._zk.storeBuild(diskimage.name, data, bnum)
  601. # Remove request on a successful build
  602. if data.state == zk.READY:
  603. self._zk.removeBuildRequest(diskimage.name)
  604. except exceptions.ZKLockException:
  605. # Lock is already held. Skip it.
  606. pass
  607. def _buildImage(self, build_id, diskimage):
  608. '''
  609. Run the external command to build the diskimage.
  610. :param str build_id: The ID for the build (used in image filename).
  611. :param diskimage: The diskimage as retrieved from our config file.
  612. :returns: An ImageBuild object of build-related data.
  613. :raises: BuilderError if we failed to execute the build command.
  614. '''
  615. base = "-".join([diskimage.name, build_id])
  616. image_file = DibImageFile(base)
  617. filename = image_file.to_path(self._config.imagesdir, False)
  618. env = os.environ.copy()
  619. env['DIB_RELEASE'] = diskimage.release
  620. env['DIB_IMAGE_NAME'] = diskimage.name
  621. env['DIB_IMAGE_FILENAME'] = filename
  622. # Note we use a reference to the nodepool config here so
  623. # that whenever the config is updated we get up to date
  624. # values in this thread.
  625. if self._config.elementsdir:
  626. env['ELEMENTS_PATH'] = self._config.elementsdir
  627. # send additional env vars if needed
  628. for k, v in diskimage.env_vars.items():
  629. env[k] = v
  630. img_elements = diskimage.elements
  631. img_types = ",".join(diskimage.image_types)
  632. qemu_img_options = ''
  633. if 'qcow2' in img_types:
  634. qemu_img_options = DEFAULT_QEMU_IMAGE_COMPAT_OPTIONS
  635. log_fn = self._getBuildLog(diskimage.name, build_id)
  636. cmd = ('%s -x -t %s --checksum --no-tmpfs %s -o %s --logfile %s %s' %
  637. (self.dib_cmd, img_types, qemu_img_options, filename,
  638. log_fn, img_elements))
  639. self._pruneBuildLogs(diskimage.name)
  640. self.log.info('Running %s' % (cmd,))
  641. self.log.info('Logging to %s' % (log_fn,))
  642. start_time = time.monotonic()
  643. # We used to use readline() on stdout to output the lines to the
  644. # build log. Unfortunately, this would block as long as the process
  645. # ran (with no easy way to timeout the read) and wedge the builder.
  646. # Now we use --logfile option to the dib command and set a timeout
  647. # on the wait() call to prevent the wedge.
  648. did_timeout = False
  649. try:
  650. p = subprocess.Popen(
  651. shlex.split(cmd),
  652. stderr=subprocess.STDOUT,
  653. env=env)
  654. except OSError as e:
  655. raise exceptions.BuilderError(
  656. "Failed to exec '%s'. Error: '%s'" % (cmd, e.strerror)
  657. )
  658. try:
  659. rc = p.wait(timeout=diskimage.build_timeout)
  660. except subprocess.TimeoutExpired:
  661. p.kill()
  662. did_timeout = True
  663. rc = 1
  664. self.log.error(
  665. "Build timeout for image %s, build %s (log: %s)",
  666. diskimage.name, build_id, log_fn)
  667. else:
  668. # Append return code to dib's log file
  669. with open(log_fn, 'ab') as log:
  670. m = "Exit code: %s\n" % rc
  671. log.write(m.encode('utf8'))
  672. # It's possible the connection to the ZK cluster could have been
  673. # interrupted during the build. If so, wait for it to return.
  674. # It could transition directly from SUSPENDED to CONNECTED, or go
  675. # through the LOST state before CONNECTED.
  676. did_suspend = False
  677. while self._zk.suspended or self._zk.lost:
  678. did_suspend = True
  679. self.log.info("ZooKeeper suspended during build. Waiting")
  680. time.sleep(SUSPEND_WAIT_TIME)
  681. if did_suspend:
  682. self.log.info("ZooKeeper available. Resuming")
  683. build_time = time.monotonic() - start_time
  684. build_data = zk.ImageBuild()
  685. build_data.builder_id = self._builder_id
  686. build_data.builder = self._hostname
  687. build_data.username = diskimage.username
  688. if self._zk.didLoseConnection:
  689. self.log.info("ZooKeeper lost while building %s" % diskimage.name)
  690. self._zk.resetLostFlag()
  691. build_data.state = zk.FAILED
  692. elif p.returncode or did_timeout:
  693. self.log.info(
  694. "DIB failed creating %s (%s) (timeout=%s)" % (
  695. diskimage.name, p.returncode, did_timeout))
  696. build_data.state = zk.FAILED
  697. else:
  698. self.log.info("DIB image %s is built" % diskimage.name)
  699. build_data.state = zk.READY
  700. build_data.formats = list(diskimage.image_types)
  701. if self._statsd:
  702. # record stats on the size of each image we create
  703. for ext in img_types.split(','):
  704. key = 'nodepool.dib_image_build.%s.%s.size' % (
  705. diskimage.name, ext)
  706. # A bit tricky because these image files may be sparse
  707. # files; we only want the true size of the file for
  708. # purposes of watching if we've added too much stuff
  709. # into the image. Note that st_blocks is defined as
  710. # 512-byte blocks by stat(2)
  711. size = os.stat("%s.%s" % (filename, ext)).st_blocks * 512
  712. self.log.debug("%s created image %s.%s (size: %d) " %
  713. (diskimage.name, filename, ext, size))
  714. self._statsd.gauge(key, size)
  715. if self._statsd:
  716. # report result to statsd
  717. for ext in img_types.split(','):
  718. key_base = 'nodepool.dib_image_build.%s.%s' % (
  719. diskimage.name, ext)
  720. self._statsd.gauge(key_base + '.rc', rc)
  721. self._statsd.timing(key_base + '.duration',
  722. int(build_time * 1000))
  723. return build_data
  724. def run(self):
  725. '''
  726. Start point for the BuildWorker thread.
  727. '''
  728. self._running = True
  729. while self._running:
  730. # Don't do work if we've lost communication with the ZK cluster
  731. did_suspend = False
  732. while self._zk and (self._zk.suspended or self._zk.lost):
  733. did_suspend = True
  734. self.log.info("ZooKeeper suspended. Waiting")
  735. time.sleep(SUSPEND_WAIT_TIME)
  736. if did_suspend:
  737. self.log.info("ZooKeeper available. Resuming")
  738. try:
  739. self._run()
  740. except Exception:
  741. self.log.exception("Exception in BuildWorker:")
  742. time.sleep(10)
  743. self._stop_event.wait(self._interval)
  744. def _run(self):
  745. '''
  746. Body of run method for exception handling purposes.
  747. '''
  748. # NOTE: For the first iteration, we expect self._config to be None
  749. new_config = nodepool_config.loadConfig(self._config_path)
  750. if self._secure_path:
  751. nodepool_config.loadSecureConfig(new_config, self._secure_path)
  752. if not self._config:
  753. self._config = new_config
  754. self._checkForZooKeeperChanges(new_config)
  755. self._config = new_config
  756. self._checkForScheduledImageUpdates()
  757. self._checkForManualBuildRequest()
  758. class UploadWorker(BaseWorker):
  759. def __init__(self, name, builder_id, config_path, secure_path,
  760. interval, zk):
  761. super(UploadWorker, self).__init__(builder_id, config_path,
  762. secure_path, interval, zk)
  763. self.log = logging.getLogger("nodepool.builder.UploadWorker.%s" % name)
  764. self.name = 'UploadWorker.%s' % name
  765. def _reloadConfig(self):
  766. '''
  767. Reload the nodepool configuration file.
  768. '''
  769. new_config = nodepool_config.loadConfig(self._config_path)
  770. if self._secure_path:
  771. nodepool_config.loadSecureConfig(new_config, self._secure_path)
  772. if not self._config:
  773. self._config = new_config
  774. self._checkForZooKeeperChanges(new_config)
  775. provider_manager.ProviderManager.reconfigure(self._config, new_config,
  776. self._zk,
  777. use_taskmanager=False,
  778. only_image_manager=True)
  779. self._config = new_config
  780. def _uploadImage(self, build_id, upload_id, image_name, images, provider,
  781. username):
  782. '''
  783. Upload a local DIB image build to a provider.
  784. :param str build_id: Unique ID of the image build to upload.
  785. :param str upload_id: Unique ID of the upload.
  786. :param str image_name: Name of the diskimage.
  787. :param list images: A list of DibImageFile objects from this build
  788. that available for uploading.
  789. :param provider: The provider from the parsed config file.
  790. :param username:
  791. '''
  792. start_time = time.time()
  793. timestamp = int(start_time)
  794. image = None
  795. for i in images:
  796. if provider.image_type == i.extension:
  797. image = i
  798. break
  799. if not image:
  800. raise exceptions.BuilderInvalidCommandError(
  801. "Unable to find image file of type %s for id %s to upload" %
  802. (provider.image_type, build_id)
  803. )
  804. self.log.debug("Found image file of type %s for image id: %s" %
  805. (image.extension, image.image_id))
  806. filename = image.to_path(self._config.imagesdir, with_extension=True)
  807. ext_image_name = provider.image_name_format.format(
  808. image_name=image_name, timestamp=str(timestamp)
  809. )
  810. self.log.info("Uploading DIB image build %s from %s to %s" %
  811. (build_id, filename, provider.name))
  812. manager = self._config.provider_managers[provider.name]
  813. provider_image = provider.diskimages.get(image_name)
  814. if provider_image is None:
  815. raise exceptions.BuilderInvalidCommandError(
  816. "Could not find matching provider image for %s" % image_name
  817. )
  818. meta = provider_image.meta.copy()
  819. meta['nodepool_build_id'] = build_id
  820. meta['nodepool_upload_id'] = upload_id
  821. try:
  822. external_id = manager.uploadImage(
  823. ext_image_name, filename,
  824. image_type=image.extension,
  825. meta=meta,
  826. md5=image.md5,
  827. sha256=image.sha256,
  828. )
  829. except Exception:
  830. self.log.exception("Failed to upload image %s to provider %s" %
  831. (image_name, provider.name))
  832. data = zk.ImageUpload()
  833. data.state = zk.FAILED
  834. return data
  835. if self._statsd:
  836. dt = int((time.time() - start_time) * 1000)
  837. key = 'nodepool.image_update.%s.%s' % (image_name,
  838. provider.name)
  839. self._statsd.timing(key, dt)
  840. self._statsd.incr(key)
  841. base = "-".join([image_name, build_id])
  842. self.log.info("Image build %s in %s is ready" %
  843. (base, provider.name))
  844. data = zk.ImageUpload()
  845. data.state = zk.READY
  846. data.external_id = external_id
  847. data.external_name = ext_image_name
  848. data.format = image.extension
  849. data.username = username
  850. return data
  851. def _checkForProviderUploads(self):
  852. '''
  853. Check for any image builds that need to be uploaded to providers.
  854. If we find any builds in the 'ready' state that haven't been uploaded
  855. to providers, do the upload if they are available on the local disk.
  856. '''
  857. for provider in self._config.providers.values():
  858. if not provider.manage_images:
  859. continue
  860. for image in provider.diskimages.values():
  861. uploaded = False
  862. # Check if we've been told to shutdown
  863. # or if ZK connection is suspended
  864. if not self._running or self._zk.suspended or self._zk.lost:
  865. return
  866. try:
  867. uploaded = self._checkProviderImageUpload(provider, image)
  868. except Exception:
  869. self.log.exception("Error uploading image %s "
  870. "to provider %s:",
  871. image.name, provider.name)
  872. # NOTE: Due to the configuration file disagreement issue
  873. # (the copy we have may not be current), if we took the time
  874. # to attempt to upload an image, let's short-circuit this loop
  875. # to give us a chance to reload the configuration file.
  876. if uploaded:
  877. return
  878. def _checkProviderImageUpload(self, provider, image):
  879. '''
  880. The main body of _checkForProviderUploads. This encapsulates
  881. checking whether an image for a provider should be uploaded
  882. and performing the upload. It is a separate function so that
  883. exception handling can treat all provider-image uploads
  884. indepedently.
  885. :returns: True if an upload was attempted, False otherwise.
  886. '''
  887. # Check if image uploads are paused.
  888. if provider.diskimages.get(image.name).pause:
  889. return False
  890. # Search for the most recent 'ready' image build
  891. builds = self._zk.getMostRecentBuilds(1, image.name,
  892. zk.READY)
  893. if not builds:
  894. return False
  895. build = builds[0]
  896. # Search for locally built images. The image name and build
  897. # sequence ID is used to name the image.
  898. local_images = DibImageFile.from_image_id(
  899. self._config.imagesdir, "-".join([image.name, build.id]))
  900. if not local_images:
  901. return False
  902. # See if this image has already been uploaded
  903. upload = self._zk.getMostRecentBuildImageUploads(
  904. 1, image.name, build.id, provider.name, zk.READY)
  905. if upload:
  906. return False
  907. # See if this provider supports the available image formats
  908. if provider.image_type not in build.formats:
  909. return False
  910. try:
  911. with self._zk.imageUploadLock(
  912. image.name, build.id, provider.name,
  913. blocking=False
  914. ):
  915. # Verify once more that it hasn't been uploaded since the
  916. # last check.
  917. upload = self._zk.getMostRecentBuildImageUploads(
  918. 1, image.name, build.id, provider.name, zk.READY)
  919. if upload:
  920. return False
  921. # NOTE: Due to the configuration file disagreement issue
  922. # (the copy we have may not be current), we try to verify
  923. # that another thread isn't trying to delete this build just
  924. # before we upload.
  925. b = self._zk.getBuild(image.name, build.id)
  926. if b.state == zk.DELETING:
  927. return False
  928. # New upload number with initial state 'uploading'
  929. data = zk.ImageUpload()
  930. data.state = zk.UPLOADING
  931. data.username = build.username
  932. upnum = self._zk.storeImageUpload(
  933. image.name, build.id, provider.name, data)
  934. data = self._uploadImage(build.id, upnum, image.name,
  935. local_images, provider,
  936. build.username)
  937. # Set final state
  938. self._zk.storeImageUpload(image.name, build.id,
  939. provider.name, data, upnum)
  940. return True
  941. except exceptions.ZKLockException:
  942. # Lock is already held. Skip it.
  943. return False
  944. def run(self):
  945. '''
  946. Start point for the UploadWorker thread.
  947. '''
  948. self._running = True
  949. while self._running:
  950. # Don't do work if we've lost communication with the ZK cluster
  951. did_suspend = False
  952. while self._zk and (self._zk.suspended or self._zk.lost):
  953. did_suspend = True
  954. self.log.info("ZooKeeper suspended. Waiting")
  955. time.sleep(SUSPEND_WAIT_TIME)
  956. if did_suspend:
  957. self.log.info("ZooKeeper available. Resuming")
  958. try:
  959. self._reloadConfig()
  960. self._checkForProviderUploads()
  961. except Exception:
  962. self.log.exception("Exception in UploadWorker:")
  963. time.sleep(10)
  964. self._stop_event.wait(self._interval)
  965. provider_manager.ProviderManager.stopProviders(self._config)
  966. class NodePoolBuilder(object):
  967. '''
  968. Main class for the Nodepool Builder.
  969. The builder has the responsibility to:
  970. * Start and maintain the working state of each worker thread.
  971. '''
  972. log = logging.getLogger("nodepool.builder.NodePoolBuilder")
  973. def __init__(self, config_path, secure_path=None,
  974. num_builders=1, num_uploaders=4, fake=False):
  975. '''
  976. Initialize the NodePoolBuilder object.
  977. :param str config_path: Path to configuration file.
  978. :param str secure_path: Path to secure configuration file.
  979. :param int num_builders: Number of build workers to start.
  980. :param int num_uploaders: Number of upload workers to start.
  981. :param bool fake: Whether to fake the image builds.
  982. '''
  983. self._config_path = config_path
  984. self._secure_path = secure_path
  985. self._config = None
  986. self._num_builders = num_builders
  987. self._build_workers = []
  988. self._num_uploaders = num_uploaders
  989. self._upload_workers = []
  990. self._janitor = None
  991. self._running = False
  992. self.cleanup_interval = 60
  993. self.build_interval = 10
  994. self.upload_interval = 10
  995. if fake:
  996. self.dib_cmd = os.path.join(os.path.dirname(__file__), '..',
  997. 'nodepool/tests/fake-image-create')
  998. else:
  999. self.dib_cmd = 'disk-image-create'
  1000. self.zk = None
  1001. # This lock is needed because the run() method is started in a
  1002. # separate thread of control, which can return before the scheduler
  1003. # has completed startup. We need to avoid shutting down before the
  1004. # startup process has completed.
  1005. self._start_lock = threading.Lock()
  1006. # ======================================================================
  1007. # Private methods
  1008. # ======================================================================
  1009. def _getBuilderID(self, id_file):
  1010. if not os.path.exists(id_file):
  1011. with open(id_file, "w") as f:
  1012. builder_id = str(uuid.uuid4())
  1013. f.write(builder_id)
  1014. return builder_id
  1015. with open(id_file, "r") as f:
  1016. builder_id = f.read()
  1017. return builder_id
  1018. def _getAndValidateConfig(self):
  1019. config = nodepool_config.loadConfig(self._config_path)
  1020. if self._secure_path:
  1021. nodepool_config.loadSecureConfig(config, self._secure_path)
  1022. if not config.zookeeper_servers.values():
  1023. raise RuntimeError('No ZooKeeper servers specified in config.')
  1024. if not config.imagesdir:
  1025. raise RuntimeError('No images-dir specified in config.')
  1026. return config
  1027. # ======================================================================
  1028. # Public methods
  1029. # ======================================================================
  1030. def start(self):
  1031. '''
  1032. Start the builder.
  1033. The builder functionality is encapsulated within threads run
  1034. by the NodePoolBuilder. This starts the needed sub-threads
  1035. which will run forever until we tell them to stop.
  1036. '''
  1037. with self._start_lock:
  1038. if self._running:
  1039. raise exceptions.BuilderError('Cannot start, already running.')
  1040. self._config = self._getAndValidateConfig()
  1041. self._running = True
  1042. builder_id_file = os.path.join(self._config.imagesdir,
  1043. "builder_id.txt")
  1044. builder_id = self._getBuilderID(builder_id_file)
  1045. # All worker threads share a single ZooKeeper instance/connection.
  1046. self.zk = zk.ZooKeeper(enable_cache=False)
  1047. self.zk.connect(list(self._config.zookeeper_servers.values()))
  1048. self.log.debug('Starting listener for build jobs')
  1049. # Create build and upload worker objects
  1050. for i in range(self._num_builders):
  1051. w = BuildWorker(i, builder_id,
  1052. self._config_path, self._secure_path,
  1053. self.build_interval, self.zk, self.dib_cmd)
  1054. w.start()
  1055. self._build_workers.append(w)
  1056. for i in range(self._num_uploaders):
  1057. w = UploadWorker(i, builder_id,
  1058. self._config_path, self._secure_path,
  1059. self.upload_interval, self.zk)
  1060. w.start()
  1061. self._upload_workers.append(w)
  1062. if self.cleanup_interval > 0:
  1063. self._janitor = CleanupWorker(
  1064. 0, builder_id,
  1065. self._config_path, self._secure_path,
  1066. self.cleanup_interval, self.zk)
  1067. self._janitor.start()
  1068. # Wait until all threads are running. Otherwise, we have a race
  1069. # on the worker _running attribute if shutdown() is called before
  1070. # run() actually begins.
  1071. workers = self._build_workers + self._upload_workers
  1072. if self._janitor:
  1073. workers += [self._janitor]
  1074. while not all([
  1075. x.running for x in (workers)]):
  1076. time.sleep(0)
  1077. def stop(self):
  1078. '''
  1079. Stop the builder.
  1080. Signal the sub threads to begin the shutdown process. We don't
  1081. want this method to return until the scheduler has successfully
  1082. stopped all of its own threads.
  1083. '''
  1084. with self._start_lock:
  1085. self.log.debug("Stopping. NodePoolBuilder shutting down workers")
  1086. # Note we do not add the upload workers to this list intentionally.
  1087. # The reason for this is that uploads can take many hours and there
  1088. # is no good way to stop the blocking writes performed by the
  1089. # uploads in order to join() below on a reasonable amount of time.
  1090. # Killing the process will stop the upload then both the record
  1091. # in zk and in the cloud will be deleted by any other running
  1092. # builders or when this builder starts again.
  1093. workers = self._build_workers
  1094. if self._janitor:
  1095. workers += [self._janitor]
  1096. for worker in (workers):
  1097. worker.shutdown()
  1098. self._running = False
  1099. self.log.debug('Waiting for jobs to complete')
  1100. # Do not exit until all of our owned threads exit.
  1101. for worker in (workers):
  1102. worker.join()
  1103. self.log.debug('Stopping providers')
  1104. provider_manager.ProviderManager.stopProviders(self._config)
  1105. self.log.debug('Terminating ZooKeeper connection')
  1106. self.zk.disconnect()
  1107. self.log.debug('Finished stopping')