Manage a pool of nodes for a distributed test infrastructure
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

handler.py 5.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. # Copyright 2018 Red Hat
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  4. # not use this file except in compliance with the License. You may obtain
  5. # a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. # License for the specific language governing permissions and limitations
  13. # under the License.
  14. import logging
  15. import time
  16. from nodepool import exceptions
  17. from nodepool import zk
  18. from nodepool.driver.utils import NodeLauncher
  19. from nodepool.driver import NodeRequestHandler
  20. from nodepool.nodeutils import nodescan
  21. class AwsInstanceLauncher(NodeLauncher):
  22. def __init__(self, handler, node, provider_config, provider_label):
  23. super().__init__(handler.zk, node, provider_config)
  24. self.retries = provider_config.launch_retries
  25. self.pool = provider_config.pools[provider_label.pool.name]
  26. self.handler = handler
  27. self.zk = handler.zk
  28. self.boot_timeout = provider_config.boot_timeout
  29. self.label = provider_label
  30. def launch(self):
  31. self.log.debug("Starting %s instance" % self.node.type)
  32. attempts = 1
  33. while attempts <= self.retries:
  34. try:
  35. instance = self.handler.manager.createInstance(self.label)
  36. break
  37. except Exception:
  38. if attempts <= self.retries:
  39. self.log.exception(
  40. "Launch attempt %d/%d failed for node %s:",
  41. attempts, self.retries, self.node.id)
  42. if attempts == self.retries:
  43. raise
  44. attempts += 1
  45. time.sleep(1)
  46. instance.create_tags(Tags=[{'Key': 'nodepool_id',
  47. 'Value': str(self.node.id)}])
  48. instance_id = instance.id
  49. self.node.external_id = instance_id
  50. self.zk.storeNode(self.node)
  51. boot_start = time.monotonic()
  52. while time.monotonic() - boot_start < self.boot_timeout:
  53. state = instance.state.get('Name')
  54. self.log.debug("Instance %s is %s" % (instance_id, state))
  55. if state == 'running':
  56. break
  57. time.sleep(0.5)
  58. instance.reload()
  59. if state != 'running':
  60. raise exceptions.LaunchStatusException(
  61. "Instance %s failed to start: %s" % (instance_id, state))
  62. server_ip = instance.public_ip_address
  63. if not server_ip:
  64. raise exceptions.LaunchStatusException(
  65. "Instance %s doesn't have a public ip" % instance_id)
  66. self.node.connection_port = self.label.cloud_image.connection_port
  67. self.node.connection_type = self.label.cloud_image.connection_type
  68. if self.pool.host_key_checking:
  69. try:
  70. if self.node.connection_type == 'ssh':
  71. gather_hostkeys = True
  72. else:
  73. gather_hostkeys = False
  74. keys = nodescan(server_ip, port=self.node.connection_port,
  75. timeout=180, gather_hostkeys=gather_hostkeys)
  76. except Exception:
  77. raise exceptions.LaunchKeyscanException(
  78. "Can't scan instance %s key" % instance_id)
  79. self.log.info("Instance %s ready" % instance_id)
  80. self.node.state = zk.READY
  81. self.node.external_id = instance_id
  82. self.node.hostname = server_ip
  83. self.node.interface_ip = server_ip
  84. self.node.public_ipv4 = server_ip
  85. self.node.host_keys = keys
  86. self.node.username = self.label.cloud_image.username
  87. self.zk.storeNode(self.node)
  88. self.log.info("Instance %s is ready", instance_id)
  89. class AwsNodeRequestHandler(NodeRequestHandler):
  90. log = logging.getLogger("nodepool.driver.aws."
  91. "AwsNodeRequestHandler")
  92. def __init__(self, pw, request):
  93. super().__init__(pw, request)
  94. self._threads = []
  95. @property
  96. def alive_thread_count(self):
  97. count = 0
  98. for t in self._threads:
  99. if t.isAlive():
  100. count += 1
  101. return count
  102. def imagesAvailable(self):
  103. '''
  104. Determines if the requested images are available for this provider.
  105. :returns: True if it is available, False otherwise.
  106. '''
  107. if self.provider.manage_images:
  108. for label in self.request.node_types:
  109. if self.pool.labels[label].cloud_image:
  110. if not self.manager.labelReady(self.pool.labels[label]):
  111. return False
  112. return True
  113. def launchesComplete(self):
  114. '''
  115. Check if all launch requests have completed.
  116. When all of the Node objects have reached a final state (READY or
  117. FAILED), we'll know all threads have finished the launch process.
  118. '''
  119. if not self._threads:
  120. return True
  121. # Give the NodeLaunch threads time to finish.
  122. if self.alive_thread_count:
  123. return False
  124. node_states = [node.state for node in self.nodeset]
  125. # NOTE: It very important that NodeLauncher always sets one of
  126. # these states, no matter what.
  127. if not all(s in (zk.READY, zk.FAILED) for s in node_states):
  128. return False
  129. return True
  130. def launch(self, node):
  131. label = self.pool.labels[node.type[0]]
  132. thd = AwsInstanceLauncher(self, node, self.provider, label)
  133. thd.start()
  134. self._threads.append(thd)