Manage a pool of nodes for a distributed test infrastructure
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

438 lines
17KB

  1. # Copyright (C) 2011-2014 OpenStack Foundation
  2. # Copyright 2017 Red Hat
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. import math
  16. import pprint
  17. import random
  18. from kazoo import exceptions as kze
  19. import openstack
  20. from nodepool import exceptions
  21. from nodepool import nodeutils as utils
  22. from nodepool import zk
  23. from nodepool.driver.utils import NodeLauncher, QuotaInformation
  24. from nodepool.driver import NodeRequestHandler
  25. class OpenStackNodeLauncher(NodeLauncher):
  26. def __init__(self, handler, node, provider_config, provider_label):
  27. '''
  28. Initialize the launcher.
  29. :param OpenStackNodeRequestHandler handler: The handler object.
  30. :param Node node: A Node object describing the node to launch.
  31. :param ProviderConfig provider_config: A ProviderConfig object
  32. describing the provider launching this node.
  33. :param ProviderLabel provider_label: A ProviderLabel object
  34. describing the label to use for the node.
  35. '''
  36. super().__init__(handler.zk, node, provider_config)
  37. # Number of times to retry failed launches.
  38. self._retries = provider_config.launch_retries
  39. self.label = provider_label
  40. self.pool = provider_label.pool
  41. self.handler = handler
  42. self.zk = handler.zk
  43. def _logConsole(self, server_id, hostname):
  44. if not self.label.console_log:
  45. return
  46. console = self.handler.manager.getServerConsole(server_id)
  47. if console:
  48. self.log.debug('Console log from hostname %s:' % hostname)
  49. for line in console.splitlines():
  50. self.log.debug(line.rstrip())
  51. def _launchNode(self):
  52. if self.label.diskimage:
  53. diskimage = self.provider_config.diskimages[
  54. self.label.diskimage.name]
  55. else:
  56. diskimage = None
  57. if diskimage:
  58. # launch using diskimage
  59. cloud_image = self.handler.zk.getMostRecentImageUpload(
  60. diskimage.name, self.provider_config.name)
  61. if not cloud_image:
  62. raise exceptions.LaunchNodepoolException(
  63. "Unable to find current cloud image %s in %s" %
  64. (diskimage.name, self.provider_config.name)
  65. )
  66. config_drive = diskimage.config_drive
  67. # Using a dict with the ID bypasses an image search during
  68. # server creation.
  69. image_external = dict(id=cloud_image.external_id)
  70. image_id = "{path}/{upload_id}".format(
  71. path=self.handler.zk._imageUploadPath(
  72. cloud_image.image_name,
  73. cloud_image.build_id,
  74. cloud_image.provider_name),
  75. upload_id=cloud_image.id)
  76. image_name = diskimage.name
  77. username = cloud_image.username
  78. connection_type = diskimage.connection_type
  79. connection_port = diskimage.connection_port
  80. else:
  81. # launch using unmanaged cloud image
  82. config_drive = self.label.cloud_image.config_drive
  83. if self.label.cloud_image.image_id:
  84. # Using a dict with the ID bypasses an image search during
  85. # server creation.
  86. image_external = dict(id=self.label.cloud_image.image_id)
  87. else:
  88. image_external = self.label.cloud_image.external_name
  89. image_id = self.label.cloud_image.name
  90. image_name = self.label.cloud_image.name
  91. username = self.label.cloud_image.username
  92. connection_type = self.label.cloud_image.connection_type
  93. connection_port = self.label.cloud_image.connection_port
  94. hostname = self.provider_config.hostname_format.format(
  95. label=self.label, provider=self.provider_config, node=self.node
  96. )
  97. self.log.info("Creating server with hostname %s in %s from image %s "
  98. "for node id: %s" % (hostname,
  99. self.provider_config.name,
  100. image_name,
  101. self.node.id))
  102. # NOTE: We store the node ID in the server metadata to use for leaked
  103. # instance detection. We cannot use the external server ID for this
  104. # because that isn't available in ZooKeeper until after the server is
  105. # active, which could cause a race in leak detection.
  106. try:
  107. server = self.handler.manager.createServer(
  108. hostname,
  109. image=image_external,
  110. min_ram=self.label.min_ram,
  111. flavor_name=self.label.flavor_name,
  112. key_name=self.label.key_name,
  113. az=self.node.az,
  114. config_drive=config_drive,
  115. nodepool_node_id=self.node.id,
  116. nodepool_node_label=self.node.type[0],
  117. nodepool_image_name=image_name,
  118. networks=self.pool.networks,
  119. security_groups=self.pool.security_groups,
  120. boot_from_volume=self.label.boot_from_volume,
  121. volume_size=self.label.volume_size,
  122. instance_properties=self.label.instance_properties,
  123. userdata=self.label.userdata)
  124. except openstack.cloud.exc.OpenStackCloudCreateException as e:
  125. if e.resource_id:
  126. self.node.external_id = e.resource_id
  127. # The outer exception handler will handle storing the
  128. # node immediately after this.
  129. raise
  130. self.node.external_id = server.id
  131. self.node.hostname = hostname
  132. self.node.image_id = image_id
  133. pool = self.handler.provider.pools.get(self.node.pool)
  134. resources = self.handler.manager.quotaNeededByNodeType(
  135. self.node.type[0], pool)
  136. self.node.resources = resources.quota['compute']
  137. if username:
  138. self.node.username = username
  139. self.node.connection_type = connection_type
  140. self.node.connection_port = connection_port
  141. # Checkpoint save the updated node info
  142. self.zk.storeNode(self.node)
  143. self.log.debug("Waiting for server %s for node id: %s" %
  144. (server.id, self.node.id))
  145. server = self.handler.manager.waitForServer(
  146. server, self.provider_config.launch_timeout,
  147. auto_ip=self.pool.auto_floating_ip)
  148. if server.status != 'ACTIVE':
  149. raise exceptions.LaunchStatusException("Server %s for node id: %s "
  150. "status: %s" %
  151. (server.id, self.node.id,
  152. server.status))
  153. # If we didn't specify an AZ, set it to the one chosen by Nova.
  154. # Do this after we are done waiting since AZ may not be available
  155. # immediately after the create request.
  156. if not self.node.az:
  157. self.node.az = server.location.zone
  158. interface_ip = server.interface_ip
  159. if not interface_ip:
  160. self.log.debug(
  161. "Server data for failed IP: %s" % pprint.pformat(
  162. server))
  163. raise exceptions.LaunchNetworkException(
  164. "Unable to find public IP of server")
  165. self.node.host_id = server.host_id
  166. self.node.interface_ip = interface_ip
  167. self.node.public_ipv4 = server.public_v4
  168. self.node.public_ipv6 = server.public_v6
  169. self.node.private_ipv4 = server.private_v4
  170. # devstack-gate multi-node depends on private_v4 being populated
  171. # with something. On clouds that don't have a private address, use
  172. # the public.
  173. if not self.node.private_ipv4:
  174. self.node.private_ipv4 = server.public_v4
  175. # Checkpoint save the updated node info
  176. self.zk.storeNode(self.node)
  177. self.log.debug(
  178. "Node %s is running [region: %s, az: %s, ip: %s ipv4: %s, "
  179. "ipv6: %s, hostid: %s]" %
  180. (self.node.id, self.node.region, self.node.az,
  181. self.node.interface_ip, self.node.public_ipv4,
  182. self.node.public_ipv6, self.node.host_id))
  183. # wait and scan the new node and record in ZooKeeper
  184. host_keys = []
  185. if self.pool.host_key_checking:
  186. try:
  187. self.log.debug(
  188. "Gathering host keys for node %s", self.node.id)
  189. # only gather host keys if the connection type is ssh
  190. gather_host_keys = connection_type == 'ssh'
  191. host_keys = utils.nodescan(
  192. interface_ip,
  193. timeout=self.provider_config.boot_timeout,
  194. gather_hostkeys=gather_host_keys,
  195. port=connection_port)
  196. if gather_host_keys and not host_keys:
  197. raise exceptions.LaunchKeyscanException(
  198. "Unable to gather host keys")
  199. except exceptions.ConnectionTimeoutException:
  200. self._logConsole(self.node.external_id, self.node.hostname)
  201. raise
  202. self.node.host_keys = host_keys
  203. self.zk.storeNode(self.node)
  204. def launch(self):
  205. attempts = 1
  206. while attempts <= self._retries:
  207. try:
  208. self._launchNode()
  209. break
  210. except kze.SessionExpiredError:
  211. # If we lost our ZooKeeper session, we've lost our node lock
  212. # so there's no need to continue.
  213. raise
  214. except Exception as e:
  215. if attempts <= self._retries:
  216. self.log.exception(
  217. "Request %s: Launch attempt %d/%d failed for node %s:",
  218. self.handler.request.id, attempts,
  219. self._retries, self.node.id)
  220. # If we created an instance, delete it.
  221. if self.node.external_id:
  222. deleting_node = zk.Node()
  223. deleting_node.provider = self.node.provider
  224. deleting_node.pool = self.node.pool
  225. deleting_node.type = self.node.type
  226. deleting_node.external_id = self.node.external_id
  227. deleting_node.state = zk.DELETING
  228. self.zk.storeNode(deleting_node)
  229. self.log.info(
  230. "Request %s: Node %s scheduled for cleanup",
  231. self.handler.request.id, deleting_node.external_id)
  232. self.node.external_id = None
  233. self.node.public_ipv4 = None
  234. self.node.public_ipv6 = None
  235. self.node.interface_ip = None
  236. self.zk.storeNode(self.node)
  237. if attempts == self._retries:
  238. raise
  239. if 'quota exceeded' in str(e).lower():
  240. # A quota exception is not directly recoverable so bail
  241. # out immediately with a specific exception.
  242. self.log.info("Quota exceeded, invalidating quota cache")
  243. self.handler.manager.invalidateQuotaCache()
  244. raise exceptions.QuotaException("Quota exceeded")
  245. attempts += 1
  246. self.node.state = zk.READY
  247. self.zk.storeNode(self.node)
  248. self.log.info("Node id %s is ready", self.node.id)
  249. class OpenStackNodeRequestHandler(NodeRequestHandler):
  250. def __init__(self, pw, request):
  251. super().__init__(pw, request)
  252. self.chosen_az = None
  253. self._threads = []
  254. @property
  255. def alive_thread_count(self):
  256. count = 0
  257. for t in self._threads:
  258. if t.isAlive():
  259. count += 1
  260. return count
  261. def imagesAvailable(self):
  262. '''
  263. Determines if the requested images are available for this provider.
  264. ZooKeeper is queried for an image uploaded to the provider that is
  265. in the READY state.
  266. :returns: True if it is available, False otherwise.
  267. '''
  268. if self.provider.manage_images:
  269. for label in self.request.node_types:
  270. if self.pool.labels[label].cloud_image:
  271. if not self.manager.labelReady(self.pool.labels[label]):
  272. return False
  273. else:
  274. if not self.zk.getMostRecentImageUpload(
  275. self.pool.labels[label].diskimage.name,
  276. self.provider.name):
  277. return False
  278. return True
  279. def hasRemainingQuota(self, ntype):
  280. needed_quota = self.manager.quotaNeededByNodeType(ntype, self.pool)
  281. if not self.pool.ignore_provider_quota:
  282. # Calculate remaining quota which is calculated as:
  283. # quota = <total nodepool quota> - <used quota> - <quota for node>
  284. cloud_quota = self.manager.estimatedNodepoolQuota()
  285. cloud_quota.subtract(
  286. self.manager.estimatedNodepoolQuotaUsed())
  287. cloud_quota.subtract(needed_quota)
  288. self.log.debug("Predicted remaining provider quota: %s",
  289. cloud_quota)
  290. if not cloud_quota.non_negative():
  291. return False
  292. # Now calculate pool specific quota. Values indicating no quota default
  293. # to math.inf representing infinity that can be calculated with.
  294. pool_quota = QuotaInformation(cores=self.pool.max_cores,
  295. instances=self.pool.max_servers,
  296. ram=self.pool.max_ram,
  297. default=math.inf)
  298. pool_quota.subtract(
  299. self.manager.estimatedNodepoolQuotaUsed(self.pool))
  300. self.log.debug("Current pool quota: %s" % pool_quota)
  301. pool_quota.subtract(needed_quota)
  302. self.log.debug("Predicted remaining pool quota: %s", pool_quota)
  303. return pool_quota.non_negative()
  304. def hasProviderQuota(self, node_types):
  305. needed_quota = QuotaInformation()
  306. for ntype in node_types:
  307. needed_quota.add(
  308. self.manager.quotaNeededByNodeType(ntype, self.pool))
  309. if not self.pool.ignore_provider_quota:
  310. cloud_quota = self.manager.estimatedNodepoolQuota()
  311. cloud_quota.subtract(needed_quota)
  312. if not cloud_quota.non_negative():
  313. return False
  314. # Now calculate pool specific quota. Values indicating no quota default
  315. # to math.inf representing infinity that can be calculated with.
  316. pool_quota = QuotaInformation(cores=self.pool.max_cores,
  317. instances=self.pool.max_servers,
  318. ram=self.pool.max_ram,
  319. default=math.inf)
  320. pool_quota.subtract(needed_quota)
  321. return pool_quota.non_negative()
  322. def checkReusableNode(self, node):
  323. if self.chosen_az and node.az != self.chosen_az:
  324. return False
  325. return True
  326. def nodeReusedNotification(self, node):
  327. """
  328. We attempt to group the node set within the same provider availability
  329. zone.
  330. For this to work properly, the provider entry in the nodepool
  331. config must list the availability zones. Otherwise, new nodes will be
  332. put in random AZs at nova's whim. The exception being if there is an
  333. existing node in the READY state that we can select for this node set.
  334. Its AZ will then be used for new nodes, as well as any other READY
  335. nodes.
  336. """
  337. # If we haven't already chosen an AZ, select the
  338. # AZ from this ready node. This will cause new nodes
  339. # to share this AZ, as well.
  340. if not self.chosen_az and node.az:
  341. self.chosen_az = node.az
  342. def setNodeMetadata(self, node):
  343. """
  344. Select grouping AZ if we didn't set AZ from a selected,
  345. pre-existing node
  346. """
  347. if not self.chosen_az:
  348. self.chosen_az = random.choice(
  349. self.pool.azs or self.manager.getAZs())
  350. node.az = self.chosen_az
  351. node.cloud = self.provider.cloud_config.name
  352. node.region = self.provider.region_name
  353. def launchesComplete(self):
  354. '''
  355. Check if all launch requests have completed.
  356. When all of the Node objects have reached a final state (READY, FAILED
  357. or ABORTED), we'll know all threads have finished the launch process.
  358. '''
  359. if not self._threads:
  360. return True
  361. # Give the NodeLaunch threads time to finish.
  362. if self.alive_thread_count:
  363. return False
  364. node_states = [node.state for node in self.nodeset]
  365. # NOTE: It's very important that NodeLauncher always sets one of
  366. # these states, no matter what.
  367. if not all(s in (zk.READY, zk.FAILED, zk.ABORTED)
  368. for s in node_states):
  369. return False
  370. return True
  371. def launch(self, node):
  372. label = self.pool.labels[node.type[0]]
  373. thd = OpenStackNodeLauncher(self, node, self.provider, label)
  374. thd.start()
  375. self._threads.append(thd)